You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/18 18:55:07 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #9615: KAFKA-10500: Add thread option

wcarlson5 opened a new pull request #9615:
URL: https://github.com/apache/kafka/pull/9615


   Can add stream threads now
   
   replace https://github.com/apache/kafka/pull/9581 after rebase
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r534261843



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    threads.remove(streamThread);

Review comment:
       Good point about the shutdown of the stream thread!
   
   Actually, I did not want to have everything in the synchronized block because I thought blocking the client state more than needed was not a good idea. I thought decreasing the size of the cache might be costly if the evicted records are forwarded downstream. 
   Now that you mention to synchronize on a separate lock, I noticed that we probably need to put resize, start, and cleanup in the same synchronized block. The reason is that if two threads call `addStreamThread()` one after the other and the later thread passes
   
   ```
   final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
   ```
   
   before the earlier thread adds the new stream thread to `threads` in `createStreamThread()`, the later thread would compute the wrong cache size.
   
   So, I am in favor of having a separate lock that just synchronizes the threads calling `addStreamThread()`. Maybe we can simply synchronize the whole method (which means to synchronize with `start()` and `close()`).
   
   Still a minor issue seems to be the synchronization between`isRunningOrRebalancing()` and `streamThread.start()`. If between these two calls the Streams client transits to `ERROR` (the global stream thread died) an `IllegalStateException` would be thrown from the `StreamStateListener` because the Streams client would try to transit from `ERROR` to `REBALANCING`. But I guess that would also happen if the Streams client transits to `ERROR` before the new stream thread transits to `PARTITION_ASSIGNED` and calls the `StreamStateListener` that would transit the Streams client to `REBALANCING`. So it needs to be fixed somewhere else.
   
   Did I miss something? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533675429



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();

Review comment:
       What about checking for the state and do the clean-up only if the state is not `PENDING_SHUTDOWN` and not `ERROR` and not `NOT_RUNNING`? In this way we are safe for future changes that break our assumption on state transitions and we make sure not to do unnecessary stuff when we are shutting down.   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527099742



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (state == State.RUNNING || state == State.REBALANCING) {
+                final int threadIdx = threads.size() + 1;

Review comment:
       Looks like I didn't understand threadIdx. that makes sense now

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (state == State.RUNNING || state == State.REBALANCING) {
+                final int threadIdx = threads.size() + 1;
+                final long cacheSizePerThread = getCacheSizePerThread(threadIdx);
+                resizeThreadCache(threadIdx);
+                final StreamThread streamThread = StreamThread.create(
+                        internalTopologyBuilder,
+                        config,
+                        clientSupplier,
+                        adminClient,
+                        processId,
+                        clientId,
+                        streamsMetrics,
+                        time,
+                        streamsMetadataState,
+                        cacheSizePerThread,
+                        stateDirectory,
+                        delegatingStateRestoreListener,
+                        threadIdx,
+                        KafkaStreams.this::closeToError,
+                        streamsUncaughtExceptionHandler
+                );
+                threads.add(streamThread);
+                threadState.put(streamThread.getId(), streamThread.state());
+                storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+                streamThread.setStateListener(streamStateListener);
+                return Optional.of(streamThread.getName());
+            } else {
+                return Optional.empty();
+            }
+        }
+    }

Review comment:
       right before the positive return




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527099779



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (state == State.RUNNING || state == State.REBALANCING) {
+                final int threadIdx = threads.size() + 1;
+                final long cacheSizePerThread = getCacheSizePerThread(threadIdx);
+                resizeThreadCache(threadIdx);
+                final StreamThread streamThread = StreamThread.create(
+                        internalTopologyBuilder,
+                        config,
+                        clientSupplier,
+                        adminClient,
+                        processId,
+                        clientId,
+                        streamsMetrics,
+                        time,
+                        streamsMetadataState,
+                        cacheSizePerThread,
+                        stateDirectory,
+                        delegatingStateRestoreListener,
+                        threadIdx,
+                        KafkaStreams.this::closeToError,
+                        streamsUncaughtExceptionHandler
+                );
+                threads.add(streamThread);
+                threadState.put(streamThread.getId(), streamThread.state());
+                storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+                streamThread.setStateListener(streamStateListener);
+                return Optional.of(streamThread.getName());
+            } else {
+                return Optional.empty();
+            }
+        }
+    }

Review comment:
       right before the positive return :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535468574



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (newThread) {
+            if (isRunningOrRebalancing()) {
+                final int threadIdx = getNextThreadIndex();
+                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+                resizeThreadCache(cacheSizePerThread);
+                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+                synchronized (stateLock) {

Review comment:
       Well newThread only syncs the addThread method. There is still the race condition between the second check of is running and starting the thread. It seems like a bad idea to leave that open as it could cause thread state changes when there shouldn't be. Starting the thread is relatively low cost so this shouldn't have much impact perf wise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r534263746



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();

Review comment:
       See my other comment https://github.com/apache/kafka/pull/9615#discussion_r534261843




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527277048



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {

Review comment:
       Well we don't want it changing state while adding a thread




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r529461975



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private static String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+        builder.stream(inputTopic);
+
+        properties  = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                        mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                        mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
+                )
+        );
+    }
+
+    @After
+    public void teardown() throws IOException {
+        purgeLocalStreamsState(properties);
+    }
+
+    private void produceMessages(final long timestamp, final String streamOneInput, final String msg) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                streamOneInput,
+                Collections.singletonList(new KeyValue<>("1", msg)),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                timestamp);

Review comment:
       wrong indentation

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private static String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+        builder.stream(inputTopic);
+
+        properties  = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                        mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                        mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
+                )
+        );
+    }
+
+    @After
+    public void teardown() throws IOException {
+        purgeLocalStreamsState(properties);
+    }
+
+    private void produceMessages(final long timestamp, final String streamOneInput, final String msg) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                streamOneInput,
+                Collections.singletonList(new KeyValue<>("1", msg)),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                timestamp);
+    }
+
+    @Test
+    public void shouldAddStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+
+            kafkaStreams.addStreamThread();
+            produceMessages(0L, inputTopic, "A");

Review comment:
       Do we need this line?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private static String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+        builder.stream(inputTopic);
+
+        properties  = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                        mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                        mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
+                )

Review comment:
       Please adjust indentation:
   
   ```suggestion
               mkMap(
                   mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
                   mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
                   mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
                   mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
                   mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
                   mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
               )
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private static String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+        builder.stream(inputTopic);
+
+        properties  = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                        mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                        mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
+                )
+        );
+    }
+
+    @After
+    public void teardown() throws IOException {
+        purgeLocalStreamsState(properties);
+    }
+
+    private void produceMessages(final long timestamp, final String streamOneInput, final String msg) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                streamOneInput,
+                Collections.singletonList(new KeyValue<>("1", msg)),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                timestamp);
+    }
+
+    @Test
+    public void shouldAddStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+
+            kafkaStreams.addStreamThread();
+            produceMessages(0L, inputTopic, "A");
+
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));

Review comment:
       We should wait for the new stream thread with a timeout, otherwise we risk that this test may become flaky.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                } else {
+                    return Optional.empty();
+                }
+            }
+            return Optional.of(streamThread.getName());

Review comment:
       ```suggestion
               synchronized (stateLock) {
                   if (isRunningOrRebalancing()) {
                       streamThread.start();
                       return Optional.of(streamThread.getName());
                   } else {
                       return Optional.empty();
                   }
               }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527530444



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +885,77 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread makeThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (isRunningOrRebalancing()) {
+                final int threadIdx = getNextThreadIndex();
+                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+                resizeThreadCache(threads.size() + 1);

Review comment:
       I think it would be cleaner to pass `cacheSizePerThread` to `resizeThreadCache()` instead of the number of stream threads. We would then just call `getCacheSizePerThread()` once instead of once in `addStreamThread()` and once in `resizeThreadCache()`. We would also just need to compute `threads.size() + 1` once.    

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +885,77 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread makeThread(final long cacheSizePerThread, final int threadIdx) {

Review comment:
       IMO, `createStreamThread()` would describe the behavior better.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {

Review comment:
       Wouldn't it be sufficent to check in the beginning if the Streams client is in `RUNNING` or `REBALANCING`, then optimistically create the stream thread, and before we start the stream thread check if the client is not in `PENDING_SHUTDOWN` and not in `ERROR` and not in `NOT_RUNNING`? State changes between `RUNNING` and `REBALANCING` should not affect adding a stream thread, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533574172



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
         for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
-                internalTopologyBuilder,
-                config,
-                clientSupplier,
-                adminClient,
-                processId,
-                clientId,
-                streamsMetrics,
-                time,
-                streamsMetadataState,
-                cacheSizePerThread,
-                stateDirectory,
-                delegatingStateRestoreListener,
-                i + 1,
-                KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+            createStreamThread(cacheSizePerThread, i + 1);

Review comment:
       I think so

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -398,6 +407,7 @@ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler st
         final Consumer<Throwable> handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
         synchronized (stateLock) {
             if (state == State.CREATED) {
+                this.streamsUncaughtExceptionHandler = handler;

Review comment:
       the `this.` is necessary. the parameter is the same name

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) {
         final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
         maxCacheSizeBytes = newCacheSizeBytes;
         if (shrink) {
+            if (caches.values().isEmpty()) {

Review comment:
       Apparently `CircularIterators` throw an error if they are made on empty lists. And if there are no caches to resize we don't need to bother with the rest of this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {

Review comment:
       If it is not running or rebalancing we after it was already running or rebalancing on line 930 we know the client has stopped. The number of threads will be reset to the config and everything will be rebuilt anyways, so changing the cache size should not matter.
   
   The state lock is so that in between the second check and starting the thread the state does not change to pending shutdown or something else. I don't think its necessary to guard the whole method as the cache should be thrown out if it's not being started.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.

Review comment:
       I think that makes more sense.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
         for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
-                internalTopologyBuilder,
-                config,
-                clientSupplier,
-                adminClient,
-                processId,
-                clientId,
-                streamsMetrics,
-                time,
-                streamsMetadataState,
-                cacheSizePerThread,
-                stateDirectory,
-                delegatingStateRestoreListener,
-                i + 1,
-                KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+            createStreamThread(cacheSizePerThread, i + 1);
         }
 
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
             Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
 
-        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
+        streamStateListener = new StreamStateListener(threadState, globalThreadState);

Review comment:
       good idea. I don't know why the `SteamStateListener` is created after the stream threads are made but it seems to work.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    return Optional.empty();
+                }
+            }
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private int getNextThreadIndex() {
+        final HashSet<String> names = new HashSet<>();

Review comment:
       As threads are removed we want to reuse those names, so incrementing would not work for us. Maybe there is away to store a next name, but then the logic would have to be spread out in a few places and I prefer to just compute a few names.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();

Review comment:
       I think I explain this above. But we can remove from the thread list.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -588,6 +592,25 @@ public void testCloseIsIdempotent() {
             closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
+    @Test
+    public void shouldAddThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        assertThat(streams.addStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize + 1));
+    }
+
+    @Test
+    public void shouldNotAddThread() {

Review comment:
       Good idea I'll add some tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533656732



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();

Review comment:
       There will be two more cases of remove. In the replace thread option and in the remove thread option.
   
   I'm not really convinced it is necessary but I don't see a problem with re-resizing the cache if we do not start the thread. I don't think there will be any side affects as the client should be shutting down, but if we resize there should be a little extra info in the state and store providers but it would not get used

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) {
         final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
         maxCacheSizeBytes = newCacheSizeBytes;
         if (shrink) {
+            if (caches.values().isEmpty()) {

Review comment:
       yeah, I didn't realize this was a problem, but when I added more test coverage it showed up

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    return Optional.empty();
+                }
+            }
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private int getNextThreadIndex() {
+        final HashSet<String> names = new HashSet<>();

Review comment:
       I'll remove a few of the unnecessary `+` operations then

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -846,18 +856,37 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
-        for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
+        streamStateListener = new StreamStateListener(threadState, globalThreadState);
+        if (hasGlobalTopology) {
+            globalStreamThread.setStateListener(streamStateListener);
+        }
+        for (int i = 1; i <= numStreamThreads; i++) {
+            createStreamThread(cacheSizePerThread, i + 1);

Review comment:
       good catch. I don't think we make sure the thread index starts at 1. But let me fix that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r534339275



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    threads.remove(streamThread);

Review comment:
       Oh okay, when I shutdown unstarted threads in a test I got a `java.lang.IllegalStateException Unexpected state transition`. But it looks like that is the client.
   
   I added a new lock for the add threads, and shutdown the thread. I think this address the problem you found with concurrent resizes. As well as @ableegoldman 's concerns




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533676976



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();

Review comment:
       From running or rebalancing aren't those the only states we can get to?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535477229



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (newThread) {
+            if (isRunningOrRebalancing()) {
+                final int threadIdx = getNextThreadIndex();
+                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+                resizeThreadCache(cacheSizePerThread);
+                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+                synchronized (stateLock) {
+                    if (isRunningOrRebalancing()) {
+                        streamThread.start();
+                        return Optional.of(streamThread.getName());
+                    } else {
+                        streamThread.shutdown();
+                        threads.remove(streamThread);
+                        resizeThreadCache(getCacheSizePerThread(threads.size()));
+                        return Optional.empty();
+                    }
+                }
+            }
+        }
+        return Optional.empty();
+    }
 
-        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
-        if (hasGlobalTopology) {
-            globalStreamThread.setStateListener(streamStateListener);
+    private int getNextThreadIndex() {
+        final HashSet<String> names = new HashSet<>();
+        for (final StreamThread streamThread: threads) {
+            names.add(streamThread.getName());
         }
-        for (final StreamThread thread : threads) {
-            thread.setStateListener(streamStateListener);
+        final String baseName = clientId + "-StreamThread-";
+        for (int i = 0; i < threads.size(); i++) {

Review comment:
       Sure, I missed that suggestion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535471146



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler

Review comment:
       ah good catch. the diff makes that hard to see as it was actually moved to a new method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533627143



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
         for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
-                internalTopologyBuilder,
-                config,
-                clientSupplier,
-                adminClient,
-                processId,
-                clientId,
-                streamsMetrics,
-                time,
-                streamsMetadataState,
-                cacheSizePerThread,
-                stateDirectory,
-                delegatingStateRestoreListener,
-                i + 1,
-                KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+            createStreamThread(cacheSizePerThread, i + 1);
         }
 
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
             Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
 
-        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
+        streamStateListener = new StreamStateListener(threadState, globalThreadState);

Review comment:
       Maybe we had some cyclic dependency at some point in the past? Not sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r529709595



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private static String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+        builder.stream(inputTopic);
+
+        properties  = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                        mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                        mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
+                )
+        );
+    }
+
+    @After
+    public void teardown() throws IOException {
+        purgeLocalStreamsState(properties);
+    }
+
+    private void produceMessages(final long timestamp, final String streamOneInput, final String msg) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                streamOneInput,
+                Collections.singletonList(new KeyValue<>("1", msg)),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                timestamp);
+    }
+
+    @Test
+    public void shouldAddStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+
+            kafkaStreams.addStreamThread();
+            produceMessages(0L, inputTopic, "A");

Review comment:
       no, we don't




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r536285505



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -845,67 +856,118 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
-        for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
-                internalTopologyBuilder,
-                config,
-                clientSupplier,
-                adminClient,
-                processId,
-                clientId,
-                streamsMetrics,
-                time,
-                streamsMetadataState,
-                cacheSizePerThread,
-                stateDirectory,
-                delegatingStateRestoreListener,
-                i + 1,
-                KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
-
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
-
-        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
+        streamStateListener = new StreamStateListener(threadState, globalThreadState);
         if (hasGlobalTopology) {
             globalStreamThread.setStateListener(streamStateListener);
         }
-        for (final StreamThread thread : threads) {
-            thread.setStateListener(streamStateListener);
+        for (int i = 1; i <= numStreamThreads; i++) {
+            createStreamThread(cacheSizePerThread, i);
         }
 
+        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
+            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+
         final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+            internalTopologyBuilder,
+            config,
+            clientSupplier,
+            adminClient,
+            processId,
+            clientId,
+            streamsMetrics,
+            time,
+            streamsMetadataState,
+            cacheSizePerThread,
+            stateDirectory,
+            delegatingStateRestoreListener,
+            threadIdx,
+            KafkaStreams.this::closeToError,
+            streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (changeThreadCount) {

Review comment:
       Why do we need this new lock-object? Would it not be simpler to just reuse `stateLock` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527836277



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {

Review comment:
       Okay we can check then only synchronize around the start of the thread to make sure it doesn't shutdown between the check and the starting 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r529716169



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private static String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+        builder.stream(inputTopic);
+
+        properties  = mkObjectProperties(
+                mkMap(
+                        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                        mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                        mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                        mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
+                )
+        );
+    }
+
+    @After
+    public void teardown() throws IOException {
+        purgeLocalStreamsState(properties);
+    }
+
+    private void produceMessages(final long timestamp, final String streamOneInput, final String msg) {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                streamOneInput,
+                Collections.singletonList(new KeyValue<>("1", msg)),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                timestamp);
+    }
+
+    @Test
+    public void shouldAddStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+
+            kafkaStreams.addStreamThread();
+            produceMessages(0L, inputTopic, "A");
+
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));

Review comment:
       we can wait for it to be added to the thread meta data. I assume that is what you mean




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527857267



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {

Review comment:
       @cadonna added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533707640



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();

Review comment:
       I think when a state transition is changed or add is when these changes should be made. Removing from the thread list is low cost as is increasing the size of the cache, so it won't be expensive to make these changes for all cases.
   
   I think the two good options we have is that we can move the cache resize and create thread into the stateLock or we can undo the changes we made if we have to abort starting the new thread. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533628342



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -846,18 +856,37 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
-        for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
+        streamStateListener = new StreamStateListener(threadState, globalThreadState);
+        if (hasGlobalTopology) {
+            globalStreamThread.setStateListener(streamStateListener);
+        }
+        for (int i = 1; i <= numStreamThreads; i++) {
+            createStreamThread(cacheSizePerThread, i + 1);

Review comment:
       `i + 1` -> `i` 
   
   Wondering why this does not result in a test failure? (Or does it; Jenkins is still running.) -- Maybe we want to add a small test that verifies that we name threads correctly.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) {
         final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
         maxCacheSizeBytes = newCacheSizeBytes;
         if (shrink) {
+            if (caches.values().isEmpty()) {

Review comment:
       I see -- so it's a bug fix on the side (not directly related to this RP) -- the original PR that added this method should have added this check.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    return Optional.empty();
+                }
+            }
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private int getNextThreadIndex() {
+        final HashSet<String> names = new HashSet<>();

Review comment:
       That is not what I meant. But it might not matter much anyway.
   
   While we need to loop over all used names in L951 below to reuse, we don't need to compute `names` from scratch but would just modify `names` each time we add/remove a thread. But it's not perf-critical so re-doing the computation is fine, too.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();

Review comment:
       Maybe. We should ensure that we do proper cleanup for all cases.
   
   What make we wonder: I don't see any code (except the `remove` you added below) that would remove a `StreamThread` from the list? Will this be done in a different PR?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {

Review comment:
       I see. So we exploit that possible state transitions are limited. Thanks for explaining. Makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r534319500



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    threads.remove(streamThread);

Review comment:
       > Unfortunately I don't think we can shutdown a thread until we have started it.
   
   Have a look at https://github.com/apache/kafka/blob/aeeb7b2f9a9abe8f49543a2278757722e5974cb3/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L976-L983




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r536318737



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -845,67 +856,118 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
-        for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
-                internalTopologyBuilder,
-                config,
-                clientSupplier,
-                adminClient,
-                processId,
-                clientId,
-                streamsMetrics,
-                time,
-                streamsMetadataState,
-                cacheSizePerThread,
-                stateDirectory,
-                delegatingStateRestoreListener,
-                i + 1,
-                KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
-
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
-
-        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
+        streamStateListener = new StreamStateListener(threadState, globalThreadState);
         if (hasGlobalTopology) {
             globalStreamThread.setStateListener(streamStateListener);
         }
-        for (final StreamThread thread : threads) {
-            thread.setStateListener(streamStateListener);
+        for (int i = 1; i <= numStreamThreads; i++) {
+            createStreamThread(cacheSizePerThread, i);
         }
 
+        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
+            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+
         final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+            internalTopologyBuilder,
+            config,
+            clientSupplier,
+            adminClient,
+            processId,
+            clientId,
+            streamsMetrics,
+            time,
+            streamsMetadataState,
+            cacheSizePerThread,
+            stateDirectory,
+            delegatingStateRestoreListener,
+            threadIdx,
+            KafkaStreams.this::closeToError,
+            streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (changeThreadCount) {

Review comment:
       We thought it would be better use a separate lock because it is serving a different purpose. It will also use used in remove thread. It might be simpler to reuse the statelock but I don’t think that would be cleaner. We are really locking on the thread cache access and the thread indexes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533626328



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -398,6 +407,7 @@ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler st
         final Consumer<Throwable> handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
         synchronized (stateLock) {
             if (state == State.CREATED) {
+                this.streamsUncaughtExceptionHandler = handler;

Review comment:
       Ah. Was missing that as you assign `handler` that is not a `StreamsUncaughtExceptionHandler` but a `Consumer<Throwable>`...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9615:
URL: https://github.com/apache/kafka/pull/9615


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535383909



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler

Review comment:
       nit: If it happens that you need to push another commit, could you fix the indentation here? Sorry that I haven't noticed this before.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (newThread) {
+            if (isRunningOrRebalancing()) {
+                final int threadIdx = getNextThreadIndex();
+                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+                resizeThreadCache(cacheSizePerThread);
+                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+                synchronized (stateLock) {
+                    if (isRunningOrRebalancing()) {
+                        streamThread.start();
+                        return Optional.of(streamThread.getName());
+                    } else {
+                        streamThread.shutdown();
+                        threads.remove(streamThread);
+                        resizeThreadCache(getCacheSizePerThread(threads.size()));
+                        return Optional.empty();
+                    }
+                }
+            }
+        }
+        return Optional.empty();
+    }
 
-        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
-        if (hasGlobalTopology) {
-            globalStreamThread.setStateListener(streamStateListener);
+    private int getNextThreadIndex() {
+        final HashSet<String> names = new HashSet<>();
+        for (final StreamThread streamThread: threads) {
+            names.add(streamThread.getName());
         }
-        for (final StreamThread thread : threads) {
-            thread.setStateListener(streamStateListener);
+        final String baseName = clientId + "-StreamThread-";
+        for (int i = 0; i < threads.size(); i++) {

Review comment:
       Shouldn't that be `int i = 1; i <= threads.size(); i++`? Otherwise, we would look up `*-StreamThread-0"` and we would not look up `"*-StreamThread-" + threads.size()`.
   
   Could you add some tests that check the correct naming as @mjsax suggested? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (newThread) {
+            if (isRunningOrRebalancing()) {
+                final int threadIdx = getNextThreadIndex();
+                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+                resizeThreadCache(cacheSizePerThread);
+                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+                synchronized (stateLock) {

Review comment:
       Sorry to bother you again with the synchronization on the `stateLock`, but could you explain why we still need it after we synchronize on `newThread`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r529713764



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                } else {
+                    return Optional.empty();
+                }
+            }
+            return Optional.of(streamThread.getName());

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527277135



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {

Review comment:
       you are right, Ill add one




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r529858401



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private static String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+        builder.stream(inputTopic);
+
+        properties  = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
+            )
+        );
+    }
+
+    @After
+    public void teardown() throws IOException {
+        purgeLocalStreamsState(properties);
+    }
+
+    @Test
+    public void shouldAddStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(() -> kafkaStreams.localThreadsMetadata().stream().sequential().map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added");
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));

Review comment:
       nit: If not all parameters fit on one line, we put each of them on a new line. Additionally we put also the closing parenthesis on a new line.
   
   nit: Lines should -- if possible -- not exceed 120 characters.
   
   nit: I like when tests are visually structured with one block for setup, the call under test, and one block for verifications. 
    
   ```suggestion
               StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
               final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
   
               final Optional<String> name = kafkaStreams.addStreamThread();
               
               assertThat(name, CoreMatchers.not(Optional.empty()));
               TestUtils.waitForCondition(
                   () -> kafkaStreams.localThreadsMetadata().stream().sequential()
                       .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))), 
                   "Wait for the thread to be added"
               );
               assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533026232



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -398,6 +407,7 @@ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler st
         final Consumer<Throwable> handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
         synchronized (stateLock) {
             if (state == State.CREATED) {
+                this.streamsUncaughtExceptionHandler = handler;

Review comment:
       nit. remove unnecessary `this.`

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
         for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
-                internalTopologyBuilder,
-                config,
-                clientSupplier,
-                adminClient,
-                processId,
-                clientId,
-                streamsMetrics,
-                time,
-                streamsMetadataState,
-                cacheSizePerThread,
-                stateDirectory,
-                delegatingStateRestoreListener,
-                i + 1,
-                KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+            createStreamThread(cacheSizePerThread, i + 1);

Review comment:
       Nit: can we change the loop to `int = 1; i <= numStreamThreads` and just pass in `i` here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                this::defaultStreamsUncaughtExceptionHandler
+                streamsUncaughtExceptionHandler
             );
             globalThreadState = globalStreamThread.state();
         }
 
         // use client id instead of thread client id since this admin client may be shared among threads
         adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(numStreamThreads);
-        final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
+        threadState = new HashMap<>(numStreamThreads);
+        storeProviders = new ArrayList<>();
         for (int i = 0; i < numStreamThreads; i++) {
-            final StreamThread streamThread = StreamThread.create(
-                internalTopologyBuilder,
-                config,
-                clientSupplier,
-                adminClient,
-                processId,
-                clientId,
-                streamsMetrics,
-                time,
-                streamsMetadataState,
-                cacheSizePerThread,
-                stateDirectory,
-                delegatingStateRestoreListener,
-                i + 1,
-                KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+            createStreamThread(cacheSizePerThread, i + 1);
         }
 
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
             Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
 
-        final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
+        streamStateListener = new StreamStateListener(threadState, globalThreadState);

Review comment:
       Can we create the `StreamStateListener` before we call `createStreamThread` and do `setStateListener` within `createStreamThread` ? If yes, we also don't need to call `setStateListener` within `addStreamThread`

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -588,6 +592,25 @@ public void testCloseIsIdempotent() {
             closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
+    @Test
+    public void shouldAddThread() throws InterruptedException {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        assertThat(streams.addStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize + 1));
+    }
+
+    @Test
+    public void shouldNotAddThread() {

Review comment:
       `shouldNotAddThreadWhenCreated`
   
   Should we also `close()` `KafkaStreams` and check that adding a thread is not possible and/or even put the client into ERROR state and test?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) {
         final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
         maxCacheSizeBytes = newCacheSizeBytes;
         if (shrink) {
+            if (caches.values().isEmpty()) {

Review comment:
       Why this check?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {

Review comment:
       Don't we need to get the `stateLock` as an outer most guard (and not check `isRunningOrRebalancing()` twice)? It seems weird to create a thread but later not start it and throw it away -- especially because we resize the caches (but also don't undo the resizing)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *

Review comment:
       Add missing `<p>` tag

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    return Optional.empty();
+                }
+            }
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private int getNextThreadIndex() {
+        final HashSet<String> names = new HashSet<>();

Review comment:
       Why do we compute the names from scratch, but not incrementally maintain them as member variable?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();

Review comment:
       Just to clarify for myself: if we don't `start()` the thread, no harm is done creating it? Or would we need to do some cleanup even if we don't start the thread?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *

Review comment:
       As above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.

Review comment:
       Should we link to `StreamConfig` instead?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -588,6 +592,25 @@ public void testCloseIsIdempotent() {
             closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
+    @Test
+    public void shouldAddThread() throws InterruptedException {

Review comment:
       `shouldAddThreadWhenRunning`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533878767



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    threads.remove(streamThread);

Review comment:
       We should also shutdown the thread if it doesn't get started, otherwise me may leak (consumer or producer) clients. But I'm actually not sure why we don't just do everything (resize cache, create thread) inside the synchronized block? I'm guessing it would deadlock due to locking on the `statelock` but can't we just synchronize on something else that wouldn't interfere with the StreamThread creation?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class AdjustStreamThreadCountTest {
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private static String inputTopic;
+    private static StreamsBuilder builder;
+    private static Properties properties;
+    private static String appId = "";
+
+    @Before
+    public void setup() {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        appId = "appId_" + testId;
+        inputTopic = "input" + testId;
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
+
+        builder  = new StreamsBuilder();
+        builder.stream(inputTopic);
+
+        properties  = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)
+            )
+        );
+    }
+
+    @After
+    public void teardown() throws IOException {
+        purgeLocalStreamsState(properties);
+    }
+
+    @Test
+    public void shouldAddStreamThread() throws Exception {
+        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
+            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+            final int oldThreadCount = kafkaStreams.localThreadsMetadata().size();
+
+            final Optional<String> name = kafkaStreams.addStreamThread();
+
+            assertThat(name, CoreMatchers.not(Optional.empty()));
+            TestUtils.waitForCondition(
+                () -> kafkaStreams.localThreadsMetadata().stream().sequential()
+                        .map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))),
+                "Wait for the thread to be added"
+            );
+            assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount + 1));
+        }

Review comment:
       Can we also assert that the state gets to `RUNNING` after the new thread has joined




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r534313845



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();
+                    return Optional.of(streamThread.getName());
+                } else {
+                    threads.remove(streamThread);

Review comment:
       Unfortunately I don't think we can shutdown a thread until we have started it.
   
    I don't think there should be a dead lock by just using the state lock around most of the method. It as mostly about cost. However I think that its is probably the safest way as it solves our problem about needing to remove a thread we have just created and we won't potentially waste time resizing the cache as such.
   
   If we synchronize on a new lock we end up with all the problems we were trying to solve earlier anyways with the possible state changes and having to clean up un-started threads




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527055689



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (state == State.RUNNING || state == State.REBALANCING) {
+                final int threadIdx = threads.size() + 1;
+                final long cacheSizePerThread = getCacheSizePerThread(threadIdx);

Review comment:
       This should be:
   ```suggestion
                   final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (state == State.RUNNING || state == State.REBALANCING) {
+                final int threadIdx = threads.size() + 1;

Review comment:
       Assume the following thread list [t2, t3, t4], `threadIdx` would be 4, which is already there. You should keep the currently used `threadIdx`s and check those to decide on the next `threadIdx`.
   
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (state == State.RUNNING || state == State.REBALANCING) {
+                final int threadIdx = threads.size() + 1;
+                final long cacheSizePerThread = getCacheSizePerThread(threadIdx);
+                resizeThreadCache(threadIdx);
+                final StreamThread streamThread = StreamThread.create(
+                        internalTopologyBuilder,
+                        config,
+                        clientSupplier,
+                        adminClient,
+                        processId,
+                        clientId,
+                        streamsMetrics,
+                        time,
+                        streamsMetadataState,
+                        cacheSizePerThread,
+                        stateDirectory,
+                        delegatingStateRestoreListener,
+                        threadIdx,
+                        KafkaStreams.this::closeToError,
+                        streamsUncaughtExceptionHandler
+                );
+                threads.add(streamThread);
+                threadState.put(streamThread.getId(), streamThread.state());
+                storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+                streamThread.setStateListener(streamStateListener);
+                return Optional.of(streamThread.getName());
+            } else {
+                return Optional.empty();
+            }
+        }
+    }

Review comment:
       Where is the stream thread started?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (state == State.RUNNING || state == State.REBALANCING) {
+                final int threadIdx = threads.size() + 1;
+                final long cacheSizePerThread = getCacheSizePerThread(threadIdx);
+                resizeThreadCache(threadIdx);

Review comment:
       ```suggestion
                   resizeThreadCache(threads.size() + 1);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527145996



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -588,6 +592,29 @@ public void testCloseIsIdempotent() {
             closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
+    @Test
+    public void testAddThread() {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        try {
+            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        } catch (final InterruptedException e) {
+            e.printStackTrace();
+        }

Review comment:
       You should not use `try-catch` here but just add `throws InterruptedException` to the method signature.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {

Review comment:
       Why do we need to synchronize the whole method on `stateLock`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (state == State.RUNNING || state == State.REBALANCING) {

Review comment:
       Could we also use `isRunningOrRebalancing()` here?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -588,6 +592,29 @@ public void testCloseIsIdempotent() {
             closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
+    @Test
+    public void testAddThread() {

Review comment:
       I would prefer to use `shouldAddThread()` as name although the pattern is different for the other test methods.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (stateLock) {
+            if (state == State.RUNNING || state == State.REBALANCING) {
+                final int threadIdx = getNextThreadIndex();
+                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+                resizeThreadCache(threads.size() + 1);
+                final StreamThread streamThread = StreamThread.create(
+                        internalTopologyBuilder,
+                        config,
+                        clientSupplier,
+                        adminClient,
+                        processId,
+                        clientId,
+                        streamsMetrics,
+                        time,
+                        streamsMetadataState,
+                        cacheSizePerThread,
+                        stateDirectory,
+                        delegatingStateRestoreListener,
+                        threadIdx,
+                        KafkaStreams.this::closeToError,
+                        streamsUncaughtExceptionHandler
+                );
+                threads.add(streamThread);
+                threadState.put(streamThread.getId(), streamThread.state());
+                storeProviders.add(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       We do something really similar when we start the stream threads at startup. Could you try to extract this part to a method?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -588,6 +592,29 @@ public void testCloseIsIdempotent() {
             closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
+    @Test
+    public void testAddThread() {
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+        streams.start();
+        final int oldSize = streams.threads.size();
+        try {
+            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+        } catch (final InterruptedException e) {
+            e.printStackTrace();
+        }
+        assertThat(streams.addStreamThread(), equalTo(Optional.of("newThread")));
+        assertThat(streams.threads.size(), equalTo(oldSize + 1));
+    }
+
+    @Test
+    public void testAddThreadNotDuringStart() {

Review comment:
       I would prefer to use `shouldNotAddThread()` as name although the pattern is different for the other test methods.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {

Review comment:
       Additionally to the unit tests that you wrote, I think we also need integration tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533687031



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
 
         stateDirCleaner = setupStateDirCleaner();
-        oldHandler = false;
         maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
         rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config);
     }
 
+    private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) {
+        final StreamThread streamThread = StreamThread.create(
+                internalTopologyBuilder,
+                config,
+                clientSupplier,
+                adminClient,
+                processId,
+                clientId,
+                streamsMetrics,
+                time,
+                streamsMetadataState,
+                cacheSizePerThread,
+                stateDirectory,
+                delegatingStateRestoreListener,
+                threadIdx,
+                KafkaStreams.this::closeToError,
+                streamsUncaughtExceptionHandler
+        );
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
+
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     *
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@code cache.max.bytes.buffering}.
+     *
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        if (isRunningOrRebalancing()) {
+            final int threadIdx = getNextThreadIndex();
+            final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+            resizeThreadCache(cacheSizePerThread);
+            final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+            streamThread.setStateListener(streamStateListener);
+            synchronized (stateLock) {
+                if (isRunningOrRebalancing()) {
+                    streamThread.start();

Review comment:
       Yes, currently this assumption is correct, but if the state transitions change in future, we would be safe if we do the cleanup.
   On a second thought, we are probably not 100% safe because if a transition from `NOT_RUNNING` to `RUNNING` is added (or any other transition that goes from the above mentioned states to `RUNNING` or `REBALANCING`), we would still not do the clean up. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#issuecomment-730500854


   @cadonna @mjsax @vvcephei Part 3 of KIP-663 add threads is ready for review


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535503453



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 cacheSizePerThread,
                 stateDirectory,
                 delegatingStateRestoreListener,
-                i + 1,
+                threadIdx,
                 KafkaStreams.this::closeToError,
-                this::defaultStreamsUncaughtExceptionHandler
-            );
-            threads.add(streamThread);
-            threadState.put(streamThread.getId(), streamThread.state());
-            storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
-        }
+                streamsUncaughtExceptionHandler
+        );
+        streamThread.setStateListener(streamStateListener);
+        threads.add(streamThread);
+        threadState.put(streamThread.getId(), streamThread.state());
+        storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+        return streamThread;
+    }
 
-        ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
+    /**
+     * Adds and starts a stream thread in addition to the stream threads that are already running in this
+     * Kafka Streams client.
+     * <p>
+     * Since the number of stream threads increases, the sizes of the caches in the new stream thread
+     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream
+     * threads does not exceed the total cache size specified in configuration
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+     * <p>
+     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
+     *
+     * @return name of the added stream thread or empty if a new stream thread could not be added
+     */
+    public Optional<String> addStreamThread() {
+        synchronized (newThread) {
+            if (isRunningOrRebalancing()) {
+                final int threadIdx = getNextThreadIndex();
+                final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1);
+                resizeThreadCache(cacheSizePerThread);
+                final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx);
+                synchronized (stateLock) {

Review comment:
       Expanding on this, the problem in the shutdown thread. When the join only waits for alive threads, and to be alive the thread needs to be started. 
   
   So if in between the check and the start thread another thread transitions the state to NOT_RUNNING the thread will not join in the shutdown thread. Then when it continues it will start as it passed the check and we will have a thread running after the client is shutdown.
   
   This would be extremely though race condition to find or reproduce so best to just avoid it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org