You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/04 20:08:19 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #9695: KAFKA-10500: Remove thread

wcarlson5 opened a new pull request #9695:
URL: https://github.com/apache/kafka/pull/9695


   Add the ability to remove running threads
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including uprade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553042805



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -992,7 +992,7 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
         if (isRunningOrRebalancing()) {
             synchronized (changeThreadCount) {
                 for (final StreamThread streamThread : threads) {
-                    if (streamThread.isAlive()) {
+                    if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) {

Review comment:
       We don't keep the dead threads in this list. When a thread dies we remove it form the list and updated the number of dead threads metric.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 edited a comment on pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 edited a comment on pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#issuecomment-742706985


   @cadonna 
   
   > In the KIP, it says that a rebalance is triggered even under static membership. I could not find any code that ensures that. Shall we do that in a separate PR?
   
   I did not see that, I think that it could complicate this PR more than necessary so perhaps in another PR
   
   > According to the KIP KafkaStreams#localThreadsMetadata() should not return metadata for dead stream threads. Should that also be part of this PR?
   
   How we achieve this is by removing any threads form the thread list when they die


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r537955392



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {

Review comment:
       Same here, let's log a warning

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {

Review comment:
       We need to protect this with a lock or use a thread-safe data structure for `threads`, otherwise we can get a ConcurrentModificationException if the user calls addThread and/or removeThread at the same time (on that note let's add test coverage for this) 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     *
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     *
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@code cache.max.bytes.buffering}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       I take it the `!streamThread.getName().equals(Thread.currentThread().getName())` is in preparation for adding the `REPLACE_THREAD` enum -- if so, can you just leave a `//TODO` here for now and add this in the followup PR so we have relevant changes reviewed together?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {
+                        try {
+                            synchronized (streamThread.state()) {
+                                streamThread.state().wait(100);
+                            }
+                        } catch (final InterruptedException e) {
+                            e.printStackTrace();

Review comment:
       Be careful about swallowing the InterruptedException completely. Getting this exception means the user wants the thread to stop, not just this specific method. And we don't know what the caller of `removeThread` looks like, it might just be an infinite loop that checks on some metric and adjusts the thread count if necessary. We should make sure to propagate the interrupt once we finish cleaning up after the thread

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {

Review comment:
       Nice. One minor suggestion, log a warning if the client isn't running (or rebalancing) and print the current state




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553005186



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception {
 
             final Optional<String> name = kafkaStreams.addStreamThread();
 
-            assertThat(name, CoreMatchers.not(Optional.empty()));
+            assertThat(name, not(Optional.empty()));
             TestUtils.waitForCondition(
                 () -> kafkaStreams.localThreadsMetadata().stream().sequential()
                         .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
                 "Wait for the thread to be added"
             );
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
-            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
-            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+            assertThat(
+                kafkaStreams
+                    .localThreadsMetadata()
+                    .stream()
+                    .map(t -> t.threadName().split("-StreamThread-")[1])
+                    .sorted().toArray(),
+                equalTo(new String[] {"1", "2", "3"})
+            );
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.state() == KafkaStreams.State.RUNNING,
+                "Kafka Streams client did not reach state RUNNING"
+            );
+        }
+    }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveThreads() throws InterruptedException {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            final CountDownLatch latch = new CountDownLatch(2);
+            final Thread one = adjustCountHelperThread(kafkaStreams, 4, latch);
+            final Thread two = adjustCountHelperThread(kafkaStreams, 6, latch);
+            two.start();
+            one.start();
+            latch.await(30, TimeUnit.SECONDS);
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount));
+        }

Review comment:
       Awesome, thanks for adding this test. One small suggestion would be to wait for the client to get back to RUNNING at the end, so we can verify that everything did go smoothly with the add/remove. I think this would be good to do in all of these tests, actually




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538911146



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {

Review comment:
       Where is it synchronized? I'm thinking that eg while iterating through this list in one thread, another thread may be inside the `synchronized (changeThreadCount)` block of where it removes from this list. Only the second access is synchronized so the iteration itself is still vulnerable right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r536354418



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {

Review comment:
       I moved the changeThreadCount lock to be more specific so we will not hold a lock while acquiring a new one.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     *
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     *
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@code cache.max.bytes.buffering}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       removeStreamThread is supposed to be a blocking call until the shutdown is complete

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,55 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {

Review comment:
       This makes sure that the names are reused correctly and that adding add removing continuously do not cause problems




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#issuecomment-758208639


   Merged to trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r540860759



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -631,6 +634,23 @@ public void shouldNotAddThreadWhenError() {
         assertThat(streams.threads.size(), equalTo(oldSize));
     }
 
+    @Test
+    public void shouldRemoveThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        assertThat(streams.removeStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+    }
+
+    @Test
+    public void shouldNotRemoveThreadWhenNotRunning() {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+    }
 

Review comment:
       Fair enough! Still we need to test also when a thread in not alive, i.e., `thread.isAlive() == false`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r540854106



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       That would still work, but not if the non-global stream threads are removed by a stream thread. I can think of a use case where a stream thread is removed by another stream thread when a special record is processed. Do we want to support such a use case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553078581



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -992,7 +992,7 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
         if (isRunningOrRebalancing()) {
             synchronized (changeThreadCount) {
                 for (final StreamThread streamThread : threads) {
-                    if (streamThread.isAlive()) {
+                    if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) {

Review comment:
       Then why check `isAlive()` here at all? (I'm not necessarily requesting changes here, just trying to understand)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553011872



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive()) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       well there is the issue of the cache resize. Which we wait until thread is dead before changing the cache size. We can not do that when a thread removes itself which makes it possible for OOM. (https://github.com/apache/kafka/pull/9695#discussion_r538921676) so it think it would be best to avoid removing itself if possible. unless it is the last thread




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553024443



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -128,6 +133,7 @@ public void shouldRemoveStreamThread() throws Exception {
             final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
             assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);

Review comment:
       Ah, sorry I didn't think of this/mention it before, but I think we actually need to wait for a _transition_ to RUNNING, and not just for it to be in the state itself. It probably takes a little while after removing a thread for the rebalance to occur, so it's probably already in RUNNING. Pretty sure there's some other integration test util that watches for the REBALANCING -> RUNNING transition, though




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553003053



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive()) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       I think we should assert one or the other at least, ie we only ever remove the current thread or we only ever remove a different thread. And document this clearly of course 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538935213



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {
+                        try {
+                            synchronized (streamThread.state()) {
+                                streamThread.state().wait(100);
+                            }
+                        } catch (final InterruptedException e) {
+                            e.printStackTrace();

Review comment:
       Alright I am resolving for clarity




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553988171



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1432,7 +1480,9 @@ public void cleanUp() {
         validateIsRunningOrRebalancing();
         final Set<ThreadMetadata> threadMetadata = new HashSet<>();
         for (final StreamThread thread : threads) {
-            threadMetadata.add(thread.threadMetadata());
+            if (thread.state() != StreamThread.State.DEAD) {
+                threadMetadata.add(thread.threadMetadata());
+            }
         }
         return threadMetadata;

Review comment:
       Ah yes, you are right! The stream thread might be replaced or just normally shut down which would not be synchronized on the `changeThreadCount` lock. However, we still do not guarantee that the state is correct when the method returns, because the state could change after the lock is released but before the method returns. At this point with or without lock it doesn't matter. Either we find something that synchronizes the whole method or we can also remove the synchronisation on the stream thread state. And also if we find something that synchronizes the whole method, I am not sure if this guarantee is worth the hassle. WDYT? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553988171



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1432,7 +1480,9 @@ public void cleanUp() {
         validateIsRunningOrRebalancing();
         final Set<ThreadMetadata> threadMetadata = new HashSet<>();
         for (final StreamThread thread : threads) {
-            threadMetadata.add(thread.threadMetadata());
+            if (thread.state() != StreamThread.State.DEAD) {
+                threadMetadata.add(thread.threadMetadata());
+            }
         }
         return threadMetadata;

Review comment:
       Ah yes, you are right! The stream thread might be replaced or just normally shut down which would not be synchronized on the `changeThreadCount` lock. However, we still do not guarantee that the state is correct when the method returns, because the state could change after the lock is released but before the method returns. At this point with or without lock it doesn't matter. Either we find something that synchronizes the whole method or we can also remove the synchronisation on the stream thread state. And also if we find something that synchronizes the whole method, I am not sure if this guarantee is worth the hassle. WDYT? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538939857



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     *
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     *
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@code cache.max.bytes.buffering}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       > Ah right you mean if removeThread() is called from the exception handler. Thanks for clarifying
   
   It doesn't have to be from the handler. A thread that calls `removeThread()` could easily be removing itself.
   
   In which case your concern about the OOM might apply as well. I don't see a good way around it though . 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553005685



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive()) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       @ableegoldman I don't know about only letting it remove its self. Not only stream threads can call this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553002043



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive()) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       I know Bruno brought this up already and maybe I just missed the resolution in the previous comments, but: should we allow a StreamThread to remove itself? Originally I was thinking "no" but I was just thinking about what be the expected behavior from this method when called from a StreamThread, and I actually think we should consider ONLY removing the calling thread. I get the sense that users will interpret `removeStreamThread()` when called from a Thread as essentially saying "shutdown this thread", not "remove some random thread". WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r551597471



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {

Review comment:
       Okay we won't get a concurrent modification but the result won't be deterministic, I think we can just synchronize the iterator in remove thread as that is the only time were we are iterating and making changes. The other places concurrent access should not be a problem. 
   
   I've added tests for the remove and add, there does not seem to be a problem but it is hard to guarantee that the race condition hit each time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553004791



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive()) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       We want to avoid removing the calling thread because the idea is that the call will block until thread is gone. This is best for the cache resizing too. However I do not like the idea of not letting a thread remove itself because, if we have one stream thread left not letting that thread remove it self (maybe leaving a global thread) this seems incomplete. If we need to make this more structured maybe we only remove itself if is the only thread running? that should make the cache and blocking issue less of a problem. How does that sound?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r540351596



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       Maybe there is a global thread and a stream thread and the user only wants a global thread now?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {
+                        streamThread.waitOnThreadState(StreamThread.State.DEAD);
+                    }
+                    synchronized (changeThreadCount) {
+                        final long cacheSizePerThread = threads.size() == 1 ? 0 : getCacheSizePerThread(threads.size() - 1);
+                        resizeThreadCache(cacheSizePerThread);
                         threads.remove(streamThread);
-                        resizeThreadCache(getCacheSizePerThread(threads.size()));
-                        return Optional.empty();
                     }
+                    return Optional.of(streamThread.getName());
                 }
             }
         }
+        log.warn("Cannot remove a stream thread in state " + state());

Review comment:
       There are two cases. Either there is no threads that can be removed of the client is in the wrong state

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();

Review comment:
       yes

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),

Review comment:
       it can be either

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {

Review comment:
       we also want to avoid removing threads in PENDING_SHUTDOWN. There should not be any dead threads, this check is to make sure another call the removeThread is not removing the same thread.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -631,6 +634,23 @@ public void shouldNotAddThreadWhenError() {
         assertThat(streams.threads.size(), equalTo(oldSize));
     }
 
+    @Test
+    public void shouldRemoveThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        assertThat(streams.removeStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+    }
+
+    @Test
+    public void shouldNotRemoveThreadWhenNotRunning() {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+    }
 

Review comment:
       There is no case where a streamThread should be dead in the thread list

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"2", "3"}));

Review comment:
       That is true, Will remove

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"2", "3"}));
+
+            final Optional<String> name2 = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));

Review comment:
       good catch

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"2", "3"}));
+
+            final Optional<String> name2 = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name2.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));

Review comment:
       will do

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -597,6 +598,18 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
+    public void waitOnThreadState(final StreamThread.State targetState) {
+        synchronized (stateLock) {
+            while (state != targetState) {
+                try {
+                    stateLock.wait();
+                } catch (final InterruptedException e) {
+                    e.printStackTrace();

Review comment:
       I will do what the wait on state method in KafkaStreams does

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));

Review comment:
       we can verify here, we verify the names in a few lines as well

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();

Review comment:
       yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553014145



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception {
 
             final Optional<String> name = kafkaStreams.addStreamThread();
 
-            assertThat(name, CoreMatchers.not(Optional.empty()));
+            assertThat(name, not(Optional.empty()));
             TestUtils.waitForCondition(
                 () -> kafkaStreams.localThreadsMetadata().stream().sequential()
                         .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
                 "Wait for the thread to be added"
             );
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
-            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
-            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+            assertThat(
+                kafkaStreams
+                    .localThreadsMetadata()
+                    .stream()
+                    .map(t -> t.threadName().split("-StreamThread-")[1])
+                    .sorted().toArray(),
+                equalTo(new String[] {"1", "2", "3"})
+            );
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.state() == KafkaStreams.State.RUNNING,
+                "Kafka Streams client did not reach state RUNNING"
+            );
+        }
+    }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {

Review comment:
       the above PR doesn't have the remove as its in this PR. bit of a circular dependency :)
   We do test that the client stays in running after shutting down some threads. We can add a test to add threads after we shut them down. However I would like to test it with remove thread anyways so if this get merged first I will update the above PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553005717



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception {
 
             final Optional<String> name = kafkaStreams.addStreamThread();
 
-            assertThat(name, CoreMatchers.not(Optional.empty()));
+            assertThat(name, not(Optional.empty()));
             TestUtils.waitForCondition(
                 () -> kafkaStreams.localThreadsMetadata().stream().sequential()
                         .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
                 "Wait for the thread to be added"
             );
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
-            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
-            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+            assertThat(
+                kafkaStreams
+                    .localThreadsMetadata()
+                    .stream()
+                    .map(t -> t.threadName().split("-StreamThread-")[1])
+                    .sorted().toArray(),
+                equalTo(new String[] {"1", "2", "3"})
+            );
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.state() == KafkaStreams.State.RUNNING,
+                "Kafka Streams client did not reach state RUNNING"
+            );
+        }
+    }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveThreads() throws InterruptedException {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            final CountDownLatch latch = new CountDownLatch(2);
+            final Thread one = adjustCountHelperThread(kafkaStreams, 4, latch);
+            final Thread two = adjustCountHelperThread(kafkaStreams, 6, latch);
+            two.start();
+            one.start();
+            latch.await(30, TimeUnit.SECONDS);
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount));
+        }

Review comment:
       good idea




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r540857996



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {

Review comment:
       Good point about not removing stream threads in `PENDING_SHUTDOWN`. 
   
   > There should not be any dead threads, this check is to make sure another call the removeThread is not removing the same thread.
   
   Right! Thanks to the new uncaught exception handler!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553043731



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -128,6 +133,7 @@ public void shouldRemoveStreamThread() throws Exception {
             final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
             assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);

Review comment:
       ah okay that makes sense 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538940948



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     *
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     *
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@code cache.max.bytes.buffering}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       >As for swallowing the InterruptedException, it still makes me uncomfortable but maybe there was/is a good reason that we do it already. Or, maybe you're just the first person to notice that we're doing that.
   
   I hope there is a good reason, but we don't seem to have a problem with it yet so....




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553023675



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -992,7 +992,7 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
         if (isRunningOrRebalancing()) {
             synchronized (changeThreadCount) {
                 for (final StreamThread streamThread : threads) {
-                    if (streamThread.isAlive()) {
+                    if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) {

Review comment:
       I guess technically this might not work if we have only one live thread left, but other dead threads still in the list -- in that case we might skip over the live thread when we really should have removed it. I'm not sure how much of a problem this would really be, just wanted to point it out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r544643577



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {

Review comment:
       Ah I missed the previous PR where you changed the list type. Ok I'm  not that familiar with `Collections.synchronizedList` but I'm still worried we may not be safe with this. From the [javadocs](https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html#synchronizedList(java.util.List)):
   
   ```
   It is imperative that the user manually synchronize on the returned list when iterating over it:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538921676



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     *
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     *
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@code cache.max.bytes.buffering}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       Ok in that case I think my concerns from [here](https://github.com/apache/kafka/pull/9695#discussion_r537976391) apply as well: is it safe to immediately redistribute the memory from the cache if the removed thread hasn't necessarily closed yet? Definitely risks an OOM, but maybe that risk is low (or at least acceptable). But we definitely can't remove the thread from `threads` until it's shut down since that puts its thread id back on the menu, and the ids have to be unique. How should we handle this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r544645060



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {

Review comment:
       It wouldn't be guaranteed to catch this, but either way I think we should have a test that starts up two threads which both try to `removeThread()` at the same time (and maybe similarly for `addStreamThread`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553079791



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -128,6 +133,7 @@ public void shouldRemoveStreamThread() throws Exception {
             final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
             assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);

Review comment:
       Cool, what you did below (`waitForApplicationState` on REBALANCING before RUNNING) looks good. But I think you missed doing that here -- in fact we should probably do a similar thing in all the tests in this class. Can you just give the file a final pass and make sure we verify the REBALANCING -> RUNNING transition in all of the tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553016839



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive()) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       cool I'll make those changes. Basically a thread can only remove itself if it is the only thread left




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553007774



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception {
 
             final Optional<String> name = kafkaStreams.addStreamThread();
 
-            assertThat(name, CoreMatchers.not(Optional.empty()));
+            assertThat(name, not(Optional.empty()));
             TestUtils.waitForCondition(
                 () -> kafkaStreams.localThreadsMetadata().stream().sequential()
                         .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
                 "Wait for the thread to be added"
             );
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
-            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
-            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+            assertThat(
+                kafkaStreams
+                    .localThreadsMetadata()
+                    .stream()
+                    .map(t -> t.threadName().split("-StreamThread-")[1])
+                    .sorted().toArray(),
+                equalTo(new String[] {"1", "2", "3"})
+            );
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.state() == KafkaStreams.State.RUNNING,
+                "Kafka Streams client did not reach state RUNNING"
+            );
+        }
+    }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {

Review comment:
       Well right now it would not stay in running until https://github.com/apache/kafka/pull/9720/files gets merged. So we can in a follow up PR get that corner case tested. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538911768



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     *
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     *
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@code cache.max.bytes.buffering}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       Ah right you mean if `removeThread()` is called from the exception handler. Thanks for clarifying




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553002043



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive()) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       I know Bruno brought this up already and maybe I just missed the resolution in the previous comments, but: should we allow a StreamThread to remove itself? Originally I was thinking "no" but I was just thinking about what be the expected behavior from this method when called from a StreamThread, and I actually think we should consider ONLY removing the calling thread.
   I get the sense that users will interpret `removeStreamThread()` when called from a Thread as essentially saying "shutdown this thread", not "remove some random thread". WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r555141563



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1432,7 +1480,9 @@ public void cleanUp() {
         validateIsRunningOrRebalancing();
         final Set<ThreadMetadata> threadMetadata = new HashSet<>();
         for (final StreamThread thread : threads) {
-            threadMetadata.add(thread.threadMetadata());
+            if (thread.state() != StreamThread.State.DEAD) {
+                threadMetadata.add(thread.threadMetadata());
+            }
         }
         return threadMetadata;

Review comment:
       since changing the thread metadata uses a copy once its is added to the list as not dead it insures that we won't return a dead thread. And after we and it to the list we don't need to care until method is called again




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r540225034



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {

Review comment:
       Additionally, we should remove dead stream threads from the list if we encounter any here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553079791



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -128,6 +133,7 @@ public void shouldRemoveStreamThread() throws Exception {
             final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
             assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);

Review comment:
       Cool, I think your plan to just `waitForApplicationState` on REBALANCING before RUNNING sounds good. But I think you missed doing that here -- in fact we should probably do a similar thing in all the tests in this class. Can you just give the file a final pass and make sure we verify the REBALANCING -> RUNNING transition in all of the tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r541047802



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -631,6 +634,23 @@ public void shouldNotAddThreadWhenError() {
         assertThat(streams.threads.size(), equalTo(oldSize));
     }
 
+    @Test
+    public void shouldRemoveThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        assertThat(streams.removeStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+    }
+
+    @Test
+    public void shouldNotRemoveThreadWhenNotRunning() {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+    }
 

Review comment:
       Sure added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r541063459



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {
+                        streamThread.waitOnThreadState(StreamThread.State.DEAD);
+                    }
+                    synchronized (changeThreadCount) {
+                        final long cacheSizePerThread = threads.size() == 1 ? 0 : getCacheSizePerThread(threads.size() - 1);
+                        resizeThreadCache(cacheSizePerThread);

Review comment:
       I agree I completely forgot to add a resize method to the global thread. Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553008933



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception {
 
             final Optional<String> name = kafkaStreams.addStreamThread();
 
-            assertThat(name, CoreMatchers.not(Optional.empty()));
+            assertThat(name, not(Optional.empty()));
             TestUtils.waitForCondition(
                 () -> kafkaStreams.localThreadsMetadata().stream().sequential()
                         .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
                 "Wait for the thread to be added"
             );
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
-            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
-            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+            assertThat(
+                kafkaStreams
+                    .localThreadsMetadata()
+                    .stream()
+                    .map(t -> t.threadName().split("-StreamThread-")[1])
+                    .sorted().toArray(),
+                equalTo(new String[] {"1", "2", "3"})
+            );
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.state() == KafkaStreams.State.RUNNING,
+                "Kafka Streams client did not reach state RUNNING"
+            );
+        }
+    }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {

Review comment:
       Ah, right. Would it make sense to add this test in the above PR instead? Or something in the middle, you could add the test in this PR and just leave out (1) for now, then add that check in the other PR. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553380512



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);

Review comment:
       nit: Could you please rename the `createStreamThread()` to `createAndAddStreamThread()`. It is a bit weird that we have `threads.remove()` in this method but no `threads.add()`. The renaming would make it clearer.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1432,7 +1480,9 @@ public void cleanUp() {
         validateIsRunningOrRebalancing();
         final Set<ThreadMetadata> threadMetadata = new HashSet<>();
         for (final StreamThread thread : threads) {
-            threadMetadata.add(thread.threadMetadata());
+            if (thread.state() != StreamThread.State.DEAD) {
+                threadMetadata.add(thread.threadMetadata());
+            }
         }
         return threadMetadata;

Review comment:
       Do we not need to synchronize this block on the `changeThreadCount` to at least guarantee that the state is correct when the method returns? Otherwise between the `if` and `threadMetadata.add(thread.threadMetadata())` the stream thread might transit to `DEAD`. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {
+                            streamThread.waitOnThreadState(StreamThread.State.DEAD);
+                        }
                         threads.remove(streamThread);
-                        resizeThreadCache(getCacheSizePerThread(threads.size()));
-                        return Optional.empty();
+                        final long cacheSizePerThread = getCacheSizePerThread(threads.size());
+                        resizeThreadCache(cacheSizePerThread);
+                        return Optional.of(streamThread.getName());
                     }
                 }
             }
+            log.warn("There are no threads eligible for removal");
+        } else {
+            log.warn("Cannot remove a stream thread in state " + state());

Review comment:
       See my nit comment above. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();

Review comment:
       nit: To deduplicate code, you could also remove these two lines, because they are the same as the last two lines of the method.
   
   nit: Could you change `Cannot add a stream thread in state " + state()` to `Cannot add a stream thread when Kafka Streams client is in state " + state()`, or similar. Currently, it is not completely clear if the state belongs to the stream thread or to the client.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -424,6 +424,7 @@ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler st
 
     private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable) {
         if (oldHandler) {
+            threads.remove((StreamThread) Thread.currentThread());

Review comment:
       nit: My IDE says that the cast is not needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r540857996



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {

Review comment:
       Good point about not removing stream threads in `PENDING_SHUTDOWN`. 
   
   > There should not be any dead threads, this check is to make sure another call the removeThread is not removing the same thread.
   
   Right! Thanks to the new uncaught exception handler! 😀 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r540148911



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {
+                        streamThread.waitOnThreadState(StreamThread.State.DEAD);
+                    }
+                    synchronized (changeThreadCount) {
+                        final long cacheSizePerThread = threads.size() == 1 ? 0 : getCacheSizePerThread(threads.size() - 1);
+                        resizeThreadCache(cacheSizePerThread);

Review comment:
       Looking at these lines, I have two questions:
   1. Why do we not also resize the cache of the global stream thread?
   2. Why do we not do all computations regarding the cache size in `getCacheSizePerThread()`?
   
   Regarding 1, I think we need to resize also the cache of the global stream thread, because otherwise the global stream thread may get over proportionally much cache when new stream threads are added beyond the initial number of steam threads and -- which I think is worse -- if all stream threads are removed, we would not use all cache for the global stream thread.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       Is there a reason why we want to support that a stream thread can remove itself? If not I would simply not allow it and check on line 970 if the stream thread is alive and not the calling thread. Not allowing a stream thread removing itself would also ensure that we do not exceed the total cache size when we resize the cache. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -597,6 +598,18 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
+    public void waitOnThreadState(final StreamThread.State targetState) {
+        synchronized (stateLock) {
+            while (state != targetState) {
+                try {
+                    stateLock.wait();
+                } catch (final InterruptedException e) {
+                    e.printStackTrace();

Review comment:
       Independently on how we decide about cleanup here, we should not print the stack trace. Please remove.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));

Review comment:
       Could you please add some line breaks? This and some of the other verifications are too long.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));

Review comment:
       nit: Since you imported statically also the other matchers, you could also statically import this one to be consistent. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),

Review comment:
       Why not `name.get()` instead of `name.orElse("")`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {

Review comment:
       This might be a nit, but if we wait on the stream thread state below, why do we not check the stream thread state being not `DEAD` here? Would be easier to understand when reading the code, IMO.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"2", "3"}));

Review comment:
       This is not a guarantee that we give in the KIP. Assuming that always the first stream thread is removed is too strict for this test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -631,6 +634,23 @@ public void shouldNotAddThreadWhenError() {
         assertThat(streams.threads.size(), equalTo(oldSize));
     }
 
+    @Test
+    public void shouldRemoveThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        assertThat(streams.removeStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+    }
+
+    @Test
+    public void shouldNotRemoveThreadWhenNotRunning() {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+    }
 

Review comment:
       A test is missing that verifies the behavior when a stream thread in state `DEAD` is in the list of stream threads.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {
+                        streamThread.waitOnThreadState(StreamThread.State.DEAD);
+                    }
+                    synchronized (changeThreadCount) {
+                        final long cacheSizePerThread = threads.size() == 1 ? 0 : getCacheSizePerThread(threads.size() - 1);
+                        resizeThreadCache(cacheSizePerThread);
                         threads.remove(streamThread);
-                        resizeThreadCache(getCacheSizePerThread(threads.size()));
-                        return Optional.empty();
                     }
+                    return Optional.of(streamThread.getName());
                 }
             }
         }
+        log.warn("Cannot remove a stream thread in state " + state());

Review comment:
       Some more information about why we cannot remove a stream thread would be useful.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),

Review comment:
       nit: wrong indentation 

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -631,6 +634,23 @@ public void shouldNotAddThreadWhenError() {
         assertThat(streams.threads.size(), equalTo(oldSize));
     }
 
+    @Test
+    public void shouldRemoveThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");

Review comment:
       I think it would be better to use something like `Kafka Streams client did not reach state RUNNING` as condition details instead of `wait until running`. BTW, the line is too long. 😁 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"

Review comment:
       IMO, this string should give details when the condition is not met, like `stream thread has not been added`. Same applies to the other wait conditions.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -631,6 +634,23 @@ public void shouldNotAddThreadWhenError() {
         assertThat(streams.threads.size(), equalTo(oldSize));
     }
 
+    @Test
+    public void shouldRemoveThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        assertThat(streams.removeStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+    }
+
+    @Test
+    public void shouldNotRemoveThreadWhenNotRunning() {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+    }

Review comment:
       Why do you not verify the number of the stream threads also here? 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }

Review comment:
       You should also verify the return value of `removeStreamThread()` here.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"2", "3"}));
+
+            final Optional<String> name2 = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name2.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));

Review comment:
       Here you should test if the stream thread has the name of the stream thread that was removed before.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();

Review comment:
       Could you add a verification that the returned name is not empty?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));

Review comment:
       Why do you not verify the actual name since it is well defined in the KIP which name should be returned given that we know that the other two stream thread are named 1 and 2?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();

Review comment:
       Could surround this call with new lines as you did for the others? Makes the calls under test more visible.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"2", "3"}));
+
+            final Optional<String> name2 = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));

Review comment:
       You verify the wrong name here.
   ```suggestion
               assertThat(name2, CoreMatchers.not(Optional.empty()));
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+

Review comment:
       Please remove empty line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553047308



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -128,6 +133,7 @@ public void shouldRemoveStreamThread() throws Exception {
             final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
             assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);

Review comment:
       I didn't see that util but this should work




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538573317



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {

Review comment:
       good idea

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {

Review comment:
       The thread list is synchronized so that take care of it, no?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {

Review comment:
       I will log it on both empty returns

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {
+                        try {
+                            synchronized (streamThread.state()) {
+                                streamThread.state().wait(100);
+                            }
+                        } catch (final InterruptedException e) {
+                            e.printStackTrace();

Review comment:
       I think if we make a `StreamThread#waitOnThreadState` we can handle it the same what as in the `KafkaStreams#waitOnState`

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {
+                        try {
+                            synchronized (streamThread.state()) {
+                                streamThread.state().wait(100);
+                            }
+                        } catch (final InterruptedException e) {
+                            e.printStackTrace();

Review comment:
       This will be moving to the other method but if you look at `KafkaStreams#waitOnState` you will see it also swallows the exception. If we follow the same pattern we would as well. If we want to deviate from that we can just have remove thread throw the interrupted exception. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     *
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     *
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@code cache.max.bytes.buffering}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       Actually it is for if the thread chosen is the thread that called remove thread. `REPLACE_THREAD` will not actually call `removeThread()`

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {
+                        try {
+                            synchronized (streamThread.state()) {
+                                streamThread.state().wait(100);

Review comment:
       good idea




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538938550



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,28 +924,69 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     *
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     *
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@code cache.max.bytes.buffering}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       I think we have to put the thread id back on the menu. Otherwise when we replace the thread we will not be able to use the same id. In the tests for `REPLACE_THREAD` there has been no problems with having the thread that is shutting down have the same name as the thread that is starting. But for a call to `removeThread` it is probably best to not resize until shutdown is complete
   
   i think the OOM risk is acceptable. The thread shutdown is much simpler than a client one and should not be using more space and the new thread should not have any tasks assigned until the old thread leaves the group.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553490127



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1432,7 +1480,9 @@ public void cleanUp() {
         validateIsRunningOrRebalancing();
         final Set<ThreadMetadata> threadMetadata = new HashSet<>();
         for (final StreamThread thread : threads) {
-            threadMetadata.add(thread.threadMetadata());
+            if (thread.state() != StreamThread.State.DEAD) {
+                threadMetadata.add(thread.threadMetadata());
+            }
         }
         return threadMetadata;

Review comment:
       `changeThreadCount ` would not take care of that we need to sync on the thread state




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#issuecomment-742706985


   @cadonna 
   
   > In the KIP, it says that a rebalance is triggered even under static membership. I could not find any code that ensures that. Shall we do that in a separate PR?
   I did not see that, I think that it could complicate this PR more than necessary so perhaps in another PR
   
   > According to the KIP KafkaStreams#localThreadsMetadata() should not return metadata for dead stream threads. Should that also be part of this PR?
   How we achieve this is by removing any threads form the thread list when they die


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #9695:
URL: https://github.com/apache/kafka/pull/9695


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553561353



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -128,6 +133,7 @@ public void shouldRemoveStreamThread() throws Exception {
             final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
             assertThat(kafkaStreams.removeStreamThread().get().split("-")[0], equalTo(appId));
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);

Review comment:
       added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r541056641



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       I think  it would be odd for use to allow an outside thread to remove that last thread but not allow a thread to remove its self




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r540854106



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       That would still work, but not if the non-global stream threads are removed by a non-global stream thread. I can think of a use case where a stream thread is removed by another stream thread when a special record is processed. Do we want to support such a use case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553006191



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -97,15 +99,132 @@ public void shouldAddStreamThread() throws Exception {
 
             final Optional<String> name = kafkaStreams.addStreamThread();
 
-            assertThat(name, CoreMatchers.not(Optional.empty()));
+            assertThat(name, not(Optional.empty()));
             TestUtils.waitForCondition(
                 () -> kafkaStreams.localThreadsMetadata().stream().sequential()
                         .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
                 "Wait for the thread to be added"
             );
             assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
-            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
-            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+            assertThat(
+                kafkaStreams
+                    .localThreadsMetadata()
+                    .stream()
+                    .map(t -> t.threadName().split("-StreamThread-")[1])
+                    .sorted().toArray(),
+                equalTo(new String[] {"1", "2", "3"})
+            );
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.state() == KafkaStreams.State.RUNNING,
+                "Kafka Streams client did not reach state RUNNING"
+            );
+        }
+    }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {

Review comment:
       One more corner case we should add test coverage for is what happens when we get down to 0 threads. Two things to verify  that I can think of are: (1) that the client stays in RUNNING, and (2) that we can add more threads again after we've been idling with zero threads for a short while (and as always, that it goes into REBALANCING --> RUNNING after adding the threads)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r537976391



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {
+                        try {
+                            synchronized (streamThread.state()) {
+                                streamThread.state().wait(100);
+                            }
+                        } catch (final InterruptedException e) {
+                            e.printStackTrace();

Review comment:
       Actually now I'm wondering, should we even do the clean up if the thread is interrupted before completed? It definitely seems like we shouldn't remove it from `threads` since that would free up the id for a new thread, and they have to be unique. And resizing the cache might be risky if the removed thread hasn't cleared its own cache yet. 
   
   I guess we could have the thread handle this cleanup itself once it reaches the end of the shutdown...but maybe the priority should be to keep the code simple. Burning a thread id probably isn't so bad, but permanently losing its share of the cache memory is. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553007907



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive()) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       Sorry, my suggestion was vaguely worded. I meant that we should only let a StreamThread remove itself, if we detect that `removeThread` has been called by a StreamThread (which should be possible). If it's called by something else, then yes we should just pick a random thread to remove. I think that addresses your concern in the first sentence of your reply, but let me know if I'm misinterpreting it.
   
   >I do not like the idea of not letting a thread remove itself because, if we have one stream thread left not letting that thread remove it self (maybe leaving a global thread) this seems incomplete.
   
   That's a good point. In that case I would advocate for _only_ letting a StreamThread remove itself. Do you see any problems with that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#issuecomment-754252489


   @ableegoldman I addressed your concern about the list synchronization with the iterator and I added some tests. I think it is good the merge WDYT? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r537974578



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {
+                        try {
+                            synchronized (streamThread.state()) {
+                                streamThread.state().wait(100);

Review comment:
       Maybe we could follow a similar pattern as to `KafkaStreams#waitOnState` and use `notify` instead of waking up every 100ms




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538934137



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {

Review comment:
       https://github.com/apache/kafka/blob/338045c9b62c3b6154789d9cfa9a1cf104188e16/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L825
   
   The list access is synchronized. I think that when using an iterator such as a for each loop that should be safe. But since we are not actually making changes out side of the `synchronized (changeThreadCount)` those shouldn't cause a problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r553016211



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -947,24 +948,65 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            synchronized (changeThreadCount) {
+                for (final StreamThread streamThread : threads) {
+                    if (streamThread.isAlive()) {
                         streamThread.shutdown();
+                        if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       So we should try removing any other thread, if one exists, and only allow to remove the current thread if it's the last one? That sounds reasonable, let's go with that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r538923610



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    return Optional.empty();
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    while (streamThread.state() != StreamThread.State.DEAD && !streamThread.getName().equals(Thread.currentThread().getName())) {
+                        try {
+                            synchronized (streamThread.state()) {
+                                streamThread.state().wait(100);
+                            }
+                        } catch (final InterruptedException e) {
+                            e.printStackTrace();

Review comment:
       Let's continue the discussion about what to do if the thread hasn't finished shutting down [here](https://github.com/apache/kafka/pull/9695#discussion_r538921676), since the same question applies to this case.
   
   As for swallowing the InterruptedException, it still makes me uncomfortable but maybe there was/is a good reason that we do it already. Or, maybe you're just the first person to notice that we're doing that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9695: KAFKA-10500: Remove thread

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r555138077



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -992,7 +992,7 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
         if (isRunningOrRebalancing()) {
             synchronized (changeThreadCount) {
                 for (final StreamThread streamThread : threads) {
-                    if (streamThread.isAlive()) {
+                    if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) {

Review comment:
       there is a very small window between when thread dies with the old handler and when it is removed from the the list. So we have to check to make sure for now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org