You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/23 08:47:22 UTC

[GitHub] [kafka] PhilHardwick opened a new pull request #10921: MINOR: Ensure queryable store providers is up to date after adding stream thread

PhilHardwick opened a new pull request #10921:
URL: https://github.com/apache/kafka/pull/10921


   When a new thread is added the queryable store providers continues to use the store providers it was given when KafkaStreams was instantiated.
   
   I wanted to keep QueryableStoreProviders immutable, so this meant I had to make the queryableStoreProvider field in KafkaStreams class mutable to allow this change.
   
   This is tested via an integration test where, after adding a thread, producing messages with different keys shows that, with the previous code, the keys are not in the store and after the change they are queryable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#issuecomment-881110915


   By the way, I filed a ticket for this. I think it's actually a pretty serious bug so we should have a JIRA for it. I'm also going to propose it as a blocker for the 3.0 release since it can render IQ completely unusable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] PhilHardwick commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
PhilHardwick commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672966032



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       Yeah agreed - that's a better idea, I've changed to a Map and being able to remove by thread name. To be honest, I'd like to just pass in StreamThread into the #addStoreProvider but then it's hard to test because the StreamThreadStateStoreProvider is instantiated inside #addStoreProvider rather than being able to inject a stub (this would also allow #removeStoreProvider to just be passed a StreamThread which would make it more consistent). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       Ah, yeah I guess it would have always had to handle `DEAD` threads since in the before-time (ie before we added the `add/removeStreamThread` APIs) it was always possible for a thread to just die when hit with an unexpected exception.
   
   That said, I feel a lot better about trimming a removed thread from the list explicitly. Don't want to build up a mass grave of mostly dead threads (well, dead thread store providers) that can never be garbage collected over the application's lifetime

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       It seems a bit weird to have to create a `new StreamThreadStateStoreProvider(streamThread)` just to remove and existing `StreamThreadStateStoreProvider` from this -- can we maybe change it so that `#removeStoreProvider` accepts a `StreamThread` reference, or even just a `String streamThreadName`? And then store a map from name to `StreamThreadStateStoreProvider` or something -- WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] PhilHardwick commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
PhilHardwick commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672966032



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       Yeah agreed - that's a better idea, I've changed to a Map and being able to remove by thread name. To be honest, I'd like to just pass in StreamThread into the #addStoreProvider but then it's hard to test because the StreamThreadStateStoreProvider is instantiated inside #addStoreProvider rather than being able to inject a stub (this would also allow #removeStoreProvider to just be passed a StreamThread which would make it more consistent). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672765942



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       It seems a bit weird to have to create a `new StreamThreadStateStoreProvider(streamThread)` just to remove and existing `StreamThreadStateStoreProvider` from this -- can we maybe change it so that `#removeStoreProvider` accepts a `StreamThread` reference, or even just a `String streamThreadName`? And then store a map from name to `StreamThreadStateStoreProvider` or something -- WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       Ah, yeah I guess it would have always had to handle `DEAD` threads since in the before-time (ie before we added the `add/removeStreamThread` APIs) it was always possible for a thread to just die when hit with an unexpected exception.
   
   That said, I feel a lot better about trimming a removed thread from the list explicitly. Don't want to build up a mass grave of mostly dead threads (well, dead thread store providers) that can never be garbage collected over the application's lifetime




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r673585765



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
##########
@@ -56,8 +60,16 @@ public QueryableStoreProvider(final List<StreamThreadStateStoreProvider> storePr
             return queryableStoreType.create(globalStoreProvider, storeName);
         }
         return queryableStoreType.create(
-            new WrappingStoreProvider(storeProviders, storeQueryParameters),
+            new WrappingStoreProvider(new ArrayList<>(storeProviders.values()), storeQueryParameters),

Review comment:
       Hmm...I'm a little less sure about this, but I think we should make sure that WrappingStoreProvider's view of the stream thread storeProviders also stays up to date when threads are added/removed. Basically if a user calls KafkaStreams.store() then adds/removes a bunch of threads without refreshing the store provider, any subsequent get() on that provider would only see the threads that existed at the time KAfkaStreams.store() was called if we make a copy like this.
   
   We should be able to just modify the WrappingStoreProvider constructor/local field to be a Set or even a Collection instead, since all it ever does is loop over this. Then we can just pass in storeProviders.values() and it's all good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] PhilHardwick commented on pull request #10921: MINOR: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
PhilHardwick commented on pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#issuecomment-869736821


   Please could @ableegoldman or @wcarlson5 take a look, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] PhilHardwick commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
PhilHardwick commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r671069535



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       The test actually passes with this line commented out, this is because in StreamThreadStateStoreProvider it deals with having dead threads left in there - https://github.com/apache/kafka/blob/4fd6d2bec8a6249e211b9769e887cc24d5cf1444/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java#L47. So without this line this if condition gets hit. With this `removeStoreProvider` this condition isn't hit, so it's a small optimisation. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] PhilHardwick closed pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
PhilHardwick closed pull request #10921:
URL: https://github.com/apache/kafka/pull/10921


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672764087



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       Ah, yeah I guess it would have always had to handle `DEAD` threads since in the before-time (ie before we added the `add/removeStreamThread` APIs) it was always possible for a thread to just die when hit with an unexpected exception.
   
   That said, I feel a lot better about trimming a removed thread from the list explicitly. Don't want to build up a mass grave of mostly dead threads (well, dead thread store providers) that can never be garbage collected over the application's lifetime

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new StreamThreadStateStoreProvider(streamThread));

Review comment:
       It seems a bit weird to have to create a `new StreamThreadStateStoreProvider(streamThread)` just to remove and existing `StreamThreadStateStoreProvider` from this -- can we maybe change it so that `#removeStoreProvider` accepts a `StreamThread` reference, or even just a `String streamThreadName`? And then store a map from name to `StreamThreadStateStoreProvider` or something -- WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#issuecomment-884482756


   Merged to trunk and cherrypicked to 2.8 and 3.0 (cc @kkonstantine)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #10921:
URL: https://github.com/apache/kafka/pull/10921


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] PhilHardwick commented on pull request #10921: MINOR: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
PhilHardwick commented on pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#issuecomment-869736821


   Please could @ableegoldman or @wcarlson5 take a look, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] PhilHardwick commented on pull request #10921: KAFKA-13096: Ensure queryable store providers is up to date after adding stream thread

Posted by GitBox <gi...@apache.org>.
PhilHardwick commented on pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#issuecomment-881274661


   Closed by accident there^
   
   Thanks @ableegoldman for creating a JIRA ticket for this. I've updated the PR with a test for querying the stores after a thread is removed. I've currently gone for the QueryStoreProvider having it's own ArrayList and Kafka Streams calling the add/remove methods on QueryStoreProvider. But I'm also happy to change it to being an ArrayList that KafkaStreams owns and QueryStoreProvider just holds the reference to that (I guess either way, when adding or removing a thread you would need to remember to update either the ArrayList or the QueryStoreProvider).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org