You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/03 13:51:21 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #9543: makes the Stream thread list resizable

wcarlson5 opened a new pull request #9543:
URL: https://github.com/apache/kafka/pull/9543


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#issuecomment-721881768


   @cadonna Looks like all those changes made sense


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517516580



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         }
 
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[numStreamThreads];
-
+        threads = new ArrayList<>(numStreamThreads);

Review comment:
       That is fair




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517630320



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);
+            threadState.put(streamThread.getId(), streamThread.state());
+            storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i)));

Review comment:
       Good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r518057272



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         }
 
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[numStreamThreads];
-
+        threads = new LinkedList<>();

Review comment:
       Currently only the constructor adds elements to the list. All other accesses afterwards are read-only. So, we would not need a synchronized list as far as I see. When we implement the add and remove streams thread APIs, we probably need synchronization. My proposal is to leave it a synchronized list now just in case we forget to think about it afterwards and then to reconsider how we synchronize the accesses.   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9543:
URL: https://github.com/apache/kafka/pull/9543


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517539432



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         }
 
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[numStreamThreads];
-
+        threads = new LinkedList<>();

Review comment:
       Should this collection be threadsafe? (or are all accesses inside synchronized blocks anyway?)

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);
+            threadState.put(streamThread.getId(), streamThread.state());
+            storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i)));

Review comment:
       ```suggestion
               storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
   ```
   
   `get(i)` is also O(n) for a linked list. Again, this is admittedly a nit, since the list is small.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);

Review comment:
       A bit of a nitpick, but this operation is O(n) for LinkedList. Better to just `add(streamThread)` if you want to use LinkedList.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#issuecomment-722642034


   The test failures look unrelated:
   ```
   
   Build / JDK 8 / org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync | 34 sec | 1
   -- | -- | --
   Build / JDK 11 / kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition | 2 min 3 sec | 1
   Build / JDK 11 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota | 59 sec | 1
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9543: makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#issuecomment-720787984


   @cadonna First part to make the streamThreads list resizable


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517629961



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         }
 
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[numStreamThreads];
-
+        threads = new LinkedList<>();

Review comment:
       They are not all in synchronized blocks so I think it probably should be thread safe




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 removed a comment on pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
wcarlson5 removed a comment on pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#issuecomment-721154784


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #9543: makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#issuecomment-721154784


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517630194



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);

Review comment:
       yep, should have changed that when I moved from ArrayList




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9543: KAFKA-10500: Makes the Stream thread list resizable

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#discussion_r517426911



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -145,7 +145,7 @@
     private final String clientId;
     private final Metrics metrics;
     private final StreamsConfig config;
-    protected final StreamThread[] threads;
+    protected final ArrayList<StreamThread> threads;

Review comment:
       I would prefer to use `List` instead of `ArrayList` to be more generic.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -719,8 +718,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
         }
 
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[numStreamThreads];
-
+        threads = new ArrayList<>(numStreamThreads);

Review comment:
       I think, it is better to keep the default initial capacity of an `ArrayList`. Otherwise, the first time a stream thread is added, we immediately run into a memory allocation. Since we do not know how many stream thread we might expect, let's use the default.   
   We could also consider using a `LinkedList` since we never access by index in production code.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);
+            threadState.put(threads.get(i).getId(), threads.get(i).state());
+            storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i)));
         }
 
         ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
-            Math.toIntExact(Arrays.stream(threads).filter(thread -> thread.state().isAlive()).count()));
+            Math.toIntExact(Arrays.stream(threads.toArray(new StreamThread[numStreamThreads])).filter(thread -> thread.state().isAlive()).count()));

Review comment:
       Please simplify to
   ```suggestion
               Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count()));
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -783,12 +781,13 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 stateDirectory,
                 delegatingStateRestoreListener,
                 i + 1);
-            threadState.put(threads[i].getId(), threads[i].state());
-            storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
+            threads.add(i, streamThread);
+            threadState.put(threads.get(i).getId(), threads.get(i).state());
+            storeProviders.add(new StreamThreadStateStoreProvider(threads.get(i)));

Review comment:
       You can simplify to
   ```suggestion
               threads.add(streamThread);
               threadState.put(streamThread.getId(), streamThread.state());
               storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org