You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/04 16:06:38 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

cadonna commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517426911



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -145,7 +145,7 @@
     private final String clientId;
     private final Metrics metrics;
     private final StreamsConfig config;
-    protected final StreamThread[] threads;
+    protected final ArrayList<StreamThread> threads;

Review comment:
       I would prefer to use `List` instead of `ArrayList` to be more generic.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         }
 
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[numStreamThreads];
-
+        threads = new ArrayList<>(numStreamThreads);

Review comment:
       I think, it is better to keep the default initial capacity of an `ArrayList`. Otherwise, the first time a stream thread is added, we immediately run into a memory allocation. Since we do not know how many stream thread we might expect, let's use the default.   
   We could also consider using a `LinkedList` since we never access by index in production code.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);
+            threadState.put(threads.get(i).getId(), threads.get(i).state());
+            storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i)));
         }
 
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(Arrays.stream(threads).filter(thread -> thread.state().isAlive()).count()));
+            Math.toIntExact(Arrays.stream(threads.toArray(new StreamThread[numStreamThreads])).filter(thread -> thread.state().isAlive()).count()));

Review comment:
       Please simplify to
   ```suggestion
               Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);
+            threadState.put(threads.get(i).getId(), threads.get(i).state());
+            storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i)));

Review comment:
       You can simplify to
   ```suggestion
               threads.add(streamThread);
               threadState.put(streamThread.getId(), streamThread.state());
               storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org