You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/10 14:43:16 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #9695: KAFKA-10500: Remove thread

cadonna commented on a change in pull request #9695:
URL: https://github.com/apache/kafka/pull/9695#discussion_r540148911



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {
+                        streamThread.waitOnThreadState(StreamThread.State.DEAD);
+                    }
+                    synchronized (changeThreadCount) {
+                        final long cacheSizePerThread = threads.size() == 1 ? 0 : getCacheSizePerThread(threads.size() - 1);
+                        resizeThreadCache(cacheSizePerThread);

Review comment:
       Looking at these lines, I have two questions:
   1. Why do we not also resize the cache of the global stream thread?
   2. Why do we not do all computations regarding the cache size in `getCacheSizePerThread()`?
   
   Regarding 1, I think we need to resize also the cache of the global stream thread, because otherwise the global stream thread may get over proportionally much cache when new stream threads are added beyond the initial number of steam threads and -- which I think is worse -- if all stream threads are removed, we would not use all cache for the global stream thread.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
       Is there a reason why we want to support that a stream thread can remove itself? If not I would simply not allow it and check on line 970 if the stream thread is alive and not the calling thread. Not allowing a stream thread removing itself would also ensure that we do not exceed the total cache size when we resize the cache. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -597,6 +598,18 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
     }
 
+    public void waitOnThreadState(final StreamThread.State targetState) {
+        synchronized (stateLock) {
+            while (state != targetState) {
+                try {
+                    stateLock.wait();
+                } catch (final InterruptedException e) {
+                    e.printStackTrace();

Review comment:
       Independently on how we decide about cleanup here, we should not print the stack trace. Please remove.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));

Review comment:
       Could you please add some line breaks? This and some of the other verifications are too long.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));

Review comment:
       nit: Since you imported statically also the other matchers, you could also statically import this one to be consistent. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),

Review comment:
       Why not `name.get()` instead of `name.orElse("")`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {

Review comment:
       This might be a nit, but if we wait on the stream thread state below, why do we not check the stream thread state being not `DEAD` here? Would be easier to understand when reading the code, IMO.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"2", "3"}));

Review comment:
       This is not a guarantee that we give in the KIP. Assuming that always the first stream thread is removed is too strict for this test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -631,6 +634,23 @@ public void shouldNotAddThreadWhenError() {
         assertThat(streams.threads.size(), equalTo(oldSize));
     }
 
+    @Test
+    public void shouldRemoveThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        assertThat(streams.removeStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+    }
+
+    @Test
+    public void shouldNotRemoveThreadWhenNotRunning() {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+    }
 

Review comment:
       A test is missing that verifies the behavior when a stream thread in state `DEAD` is in the list of stream threads.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -924,25 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int
      * @return name of the added stream thread or empty if a new stream thread could not be added
      */
     public Optional<String> addStreamThread() {
-        synchronized (changeThreadCount) {
-            if (isRunningOrRebalancing()) {
-                final int threadIdx = getNextThreadIndex();
-                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+        if (isRunningOrRebalancing()) {
+            final int threadIdx;
+            final long cacheSizePerThread;
+            synchronized (changeThreadCount) {
+                threadIdx = getNextThreadIndex();
+                cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
                 resizeThreadCache(cacheSizePerThread);
-                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
-                synchronized (stateLock) {
-                    if (isRunningOrRebalancing()) {
-                        streamThread.start();
-                        return Optional.of(streamThread.getName());
-                    } else {
-                        streamThread.shutdown();
+            }
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    streamThread.shutdown();
+                    threads.remove(streamThread);
+                    resizeThreadCache(getCacheSizePerThread(threads.size()));
+                    log.warn("Cannot add a stream thread in state " + state());
+                    return Optional.empty();
+                }
+            }
+        }
+        log.warn("Cannot add a stream thread in state " + state());
+        return Optional.empty();
+    }
+
+    /**
+     * Removes one stream thread out of the running stream threads from this Kafka Streams client.
+     * <p>
+     * The removed stream thread is gracefully shut down. This method does not specify which stream
+     * thread is shut down.
+     * <p>
+     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream
+     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total
+     * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     *
+     * @return name of the removed stream thread or empty if a stream thread could not be removed because
+     *         no stream threads are alive
+     */
+    public Optional<String> removeStreamThread() {
+        if (isRunningOrRebalancing()) {
+            for (final StreamThread streamThread : threads) {
+                if (streamThread.isAlive()) {
+                    streamThread.shutdown();
+                    if (!streamThread.getName().equals(Thread.currentThread().getName())) {
+                        streamThread.waitOnThreadState(StreamThread.State.DEAD);
+                    }
+                    synchronized (changeThreadCount) {
+                        final long cacheSizePerThread = threads.size() == 1 ? 0 : getCacheSizePerThread(threads.size() - 1);
+                        resizeThreadCache(cacheSizePerThread);
                         threads.remove(streamThread);
-                        resizeThreadCache(getCacheSizePerThread(threads.size()));
-                        return Optional.empty();
                     }
+                    return Optional.of(streamThread.getName());
                 }
             }
         }
+        log.warn("Cannot remove a stream thread in state " + state());

Review comment:
       Some more information about why we cannot remove a stream thread would be useful.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),

Review comment:
       nit: wrong indentation 

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -631,6 +634,23 @@ public void shouldNotAddThreadWhenError() {
         assertThat(streams.threads.size(), equalTo(oldSize));
     }
 
+    @Test
+    public void shouldRemoveThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");

Review comment:
       I think it would be better to use something like `Kafka Streams client did not reach state RUNNING` as condition details instead of `wait until running`. BTW, the line is too long. 😁 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"

Review comment:
       IMO, this string should give details when the condition is not met, like `stream thread has not been added`. Same applies to the other wait conditions.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -631,6 +634,23 @@ public void shouldNotAddThreadWhenError() {
         assertThat(streams.threads.size(), equalTo(oldSize));
     }
 
+    @Test
+    public void shouldRemoveThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        assertThat(streams.removeStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+    }
+
+    @Test
+    public void shouldNotRemoveThreadWhenNotRunning() {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+    }

Review comment:
       Why do you not verify the number of the stream threads also here? 

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }

Review comment:
       You should also verify the return value of `removeStreamThread()` here.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"2", "3"}));
+
+            final Optional<String> name2 = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name2.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));

Review comment:
       Here you should test if the stream thread has the name of the stream thread that was removed before.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();

Review comment:
       Could you add a verification that the returned name is not empty?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));

Review comment:
       Why do you not verify the actual name since it is well defined in the KIP which name should be returned given that we know that the other two stream thread are named 1 and 2?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();

Review comment:
       Could surround this call with new lines as you did for the others? Makes the calls under test more visible.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+
+
+            oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"2", "3"}));
+
+            final Optional<String> name2 = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));

Review comment:
       You verify the wrong name here.
   ```suggestion
               assertThat(name2, CoreMatchers.not(Optional.empty()));
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -108,4 +108,53 @@ public void shouldAddStreamThread() throws Exception {
             TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
         }
     }
+
+    @Test
+    public void shouldRemoveStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            kafkaStreams.removeStreamThread();
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount - 1));
+        }
+    }
+
+    @Test
+    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2"}));
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+            assertThat(kafkaStreams.localThreadsMetadata().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), equalTo(new String[] {"1", "2", "3"}));
+            TestUtils.waitForCondition(() -> kafkaStreams.state() == KafkaStreams.State.RUNNING, "wait for running");
+

Review comment:
       Please remove empty line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org