You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/17 15:08:20 UTC

[GitHub] [kafka] andy0x01 opened a new pull request #11334: KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAn…

andy0x01 opened a new pull request #11334:
URL: https://github.com/apache/kafka/pull/11334


   KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread does not gate on stream state well
   
   The test now waits for the client to transition to REBALANCING/RUNNING after adding/removing a thread as well as to transition to RUNNING before querying the state store.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11334: KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAn…

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11334:
URL: https://github.com/apache/kafka/pull/11334


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11334: KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAn…

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11334:
URL: https://github.com/apache/kafka/pull/11334#discussion_r713141647



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -477,6 +477,7 @@ public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
         //Add thread
         final Optional<String> streamThread = kafkaStreams1.addStreamThread();
         assertThat(streamThread.isPresent(), is(true));
+        until(() -> kafkaStreams1.state().isRunningOrRebalancing());

Review comment:
       It is true that we are waiting for state below, however the way we are doing so causes some exceptions due to streams being in the wrong state. These exceptions are really not the point of any of the tests in this file. So how we have gotten around that was white listing exception messages. It would be better just to wait for the correct state before trying do do any IQ, like what is done here. @andy0x01 the isolation of the add remove is a nice benefit as well. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] andy0x01 commented on a change in pull request #11334: KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAn…

Posted by GitBox <gi...@apache.org>.
andy0x01 commented on a change in pull request #11334:
URL: https://github.com/apache/kafka/pull/11334#discussion_r713063360



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -477,6 +477,7 @@ public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
         //Add thread
         final Optional<String> streamThread = kafkaStreams1.addStreamThread();
         assertThat(streamThread.isPresent(), is(true));
+        until(() -> kafkaStreams1.state().isRunningOrRebalancing());

Review comment:
       You are right. I see the point that it should not make any functional difference.
   
   According to `KafkaStreams.State`: 
   
   > REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state
   
   ( so as soon as the new `StreamThread` is in `RUNNING`)
   
   Although `REBALANCING` is explicitly stated in [KAFKA-13246](https://issues.apache.org/jira/browse/KAFKA-13246):
   
   > StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread should be improved by waiting for the client to go to rebalancing or running after adding and removing a thread. It should also wait until running before querying the state store 
   
   One small benefit I see here is to assert in isolation of any following code that adding/removing a thread was indeed successful.
   
   @wcarlson5 maybe you as the author of the JIRA issue could give us a hint if we are overlooking anything here ?
   Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] 3schwartz commented on a change in pull request #11334: KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAn…

Posted by GitBox <gi...@apache.org>.
3schwartz commented on a change in pull request #11334:
URL: https://github.com/apache/kafka/pull/11334#discussion_r712475960



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -477,6 +477,7 @@ public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
         //Add thread
         final Optional<String> streamThread = kafkaStreams1.addStreamThread();
         assertThat(streamThread.isPresent(), is(true));
+        until(() -> kafkaStreams1.state().isRunningOrRebalancing());

Review comment:
       It seems like this isn't used. The rebalancing will end up in a running state which is waited for below. I suggest only have the condition below waiting for a running state before query the state store.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -503,9 +505,9 @@ public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
 
         final Optional<String> removedThreadName = kafkaStreams1.removeStreamThread();
         assertThat(removedThreadName.isPresent(), is(true));
+        until(() -> kafkaStreams1.state().isRunningOrRebalancing());

Review comment:
       Same as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org