You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/28 17:22:25 UTC

[GitHub] [kafka] vvcephei opened a new pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

vvcephei opened a new pull request #9521:
URL: https://github.com/apache/kafka/pull/9521


   This test has been observed to have flaky failures.
   Apparently, in the failed runs, Streams had entered a rebalance
   before some of the assertions were made. We recently made
   IQ a little stricter on whether it would return errors instead of
   null responses in such cases:
   KAFKA-10598: Improve IQ name and type checks (#9408)
   
   As a result, we have started seeing failures now instead of
   silently executing an invalid test (I.e., it was asserting the
   return to be `null`, but the result was `null` for the wrong
   reason).
   
   Now, if the test discovers that Streams is no longer running,
   it will repeat the verification until it actually gets a valid
   positive or negative result.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9521:
URL: https://github.com/apache/kafka/pull/9521


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#discussion_r514323639



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -117,17 +126,34 @@ public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
-
-        final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        // Assert that only active is able to query for a key by default
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
-        assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), is(nullValue()));
+        until(() -> {
+
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
+
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            // Assert that only active is able to query for a key by default
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
+            try {
+                if (kafkaStreams1IsActive) {
+                    assertThat(store2.get(key), is(nullValue()));
+                } else {
+                    assertThat(store1.get(key), is(nullValue()));
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {

Review comment:
       I wanted to keep the concerns separate, so that unexpected exceptions would cause the test to fail fast. The idea is that `until` is the inverse of `while`, namely, it just loops as long as the condition evaluates to `false`. If the condition throws an exception, then the loop also throws, just like the real `while` loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#discussion_r513628177



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
##########
@@ -44,7 +44,7 @@
      * @param storeProvider     provides access to all the underlying StateStore instances
      * @param storeName         The name of the Store
      * @return a read-only interface over a {@code StateStore}
-     *        (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
+     *        (cf. {@link QueryableStoreTypes.KeyValueStoreType})

Review comment:
       This class is in the same package, so the fully-qualified name is not necessary.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -153,51 +179,75 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception {
 
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        //key belongs to this partition
-        final int keyPartition = keyQueryMetadata.partition();
-
-        //key doesn't belongs to this partition
-        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
-            StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-                .withPartition(keyPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
-        if (kafkaStreams1IsActive) {
-            store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
-        } else {
-            store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
-        }
-
-        if (kafkaStreams1IsActive) {
-            assertThat(store1, is(notNullValue()));
-            assertThat(store2, is(nullValue()));
-        } else {
-            assertThat(store2, is(notNullValue()));
-            assertThat(store1, is(nullValue()));
-        }
+        until(() -> {
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            //key belongs to this partition
+            final int keyPartition = keyQueryMetadata.partition();
+
+            //key doesn't belongs to this partition
+            final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                    .withPartition(keyPartition);
+            ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+            ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+            if (kafkaStreams1IsActive) {
+                store1 = getStore(kafkaStreams1, storeQueryParam);
+            } else {
+                store2 = getStore(kafkaStreams2, storeQueryParam);
+            }
+
+            if (kafkaStreams1IsActive) {
+                assertThat(store1, is(notNullValue()));
+                assertThat(store2, is(nullValue()));
+            } else {
+                assertThat(store2, is(notNullValue()));
+                assertThat(store1, is(nullValue()));
+            }
+
+            // Assert that only active for a specific requested partition serves key if stale stores and not enabled
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                .withPartition(keyDontBelongPartition);
 
-        // Assert that only active for a specific requested partition serves key if stale stores and not enabled
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
 
-        storeQueryParam = StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-            .withPartition(keyDontBelongPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store3 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store4 = null;
-        if (!kafkaStreams1IsActive) {
-            store3 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
-        } else {
-            store4 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
-        }
 
-        // Assert that key is not served when wrong specific partition is requested
-        // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
-        // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
-        assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), is(nullValue()));
+            try {
+                // Assert that key is not served when wrong specific partition is requested
+                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
+                // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
+                if (kafkaStreams1IsActive) {
+                    assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store source-table does not exist.")
+                    );
+                } else {
+                    assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store source-table does not exist.")
+                    );
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {
+                assertThat(
+                    exception.getMessage(),
+                    containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+                );
+                LOG.info("Streams wasn't running. Will try again.");
+                return false;

Review comment:
       Also, here, if we find that Streams is rebalancing, we'll try the whole verification again, including to re-discover the stores in case the stores have swapped ownership.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -337,34 +385,49 @@ public void shouldQuerySpecificStalePartitionStoresMultiStreamThreads() throws E
 
         //key doesn't belongs to this partition
         final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
 
         // Assert that both active and standby are able to query for a key
         final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> param = StoreQueryParameters
-                .fromNameAndType(TABLE_NAME, queryableStoreType)
-                .enableStaleStores()
-                .withPartition(keyPartition);
+            .fromNameAndType(TABLE_NAME, queryableStoreType)
+            .enableStaleStores()
+            .withPartition(keyPartition);
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(kafkaStreams1, param);
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(kafkaStreams1, param);
             return store1.get(key) != null;
         }, "store1 cannot find results for key");
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(kafkaStreams2, param);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(kafkaStreams2, param);
             return store2.get(key) != null;
         }, "store2 cannot find results for key");
 
         final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> otherParam = StoreQueryParameters
-                .fromNameAndType(TABLE_NAME, queryableStoreType)
-                .enableStaleStores()
-                .withPartition(keyDontBelongPartition);
-        final ReadOnlyKeyValueStore<Integer, Integer> store3 = IntegrationTestUtils.getStore(kafkaStreams1, otherParam);
-        final ReadOnlyKeyValueStore<Integer, Integer> store4 = IntegrationTestUtils.getStore(kafkaStreams2, otherParam);
+            .fromNameAndType(TABLE_NAME, queryableStoreType)
+            .enableStaleStores()
+            .withPartition(keyDontBelongPartition);
+        final ReadOnlyKeyValueStore<Integer, Integer> store3 = getStore(kafkaStreams1, otherParam);
+        final ReadOnlyKeyValueStore<Integer, Integer> store4 = getStore(kafkaStreams2, otherParam);
 
         // Assert that
         assertThat(store3.get(key), is(nullValue()));
         assertThat(store4.get(key), is(nullValue()));
     }
 
+    private static void until(final TestCondition condition) {

Review comment:
       Note, this is different than `TestUtils.waitForCondition`, which does the inverse thing. That one will retry on exceptions and otherwise verify that the return is `true`. We need to fail on exceptions and retry as long as the return is `false`.
   
   I opted to keep this method here, since it might be confusing next to the other util method.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -117,17 +126,34 @@ public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
-
-        final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        // Assert that only active is able to query for a key by default
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
-        assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), is(nullValue()));
+        until(() -> {
+
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
+
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            // Assert that only active is able to query for a key by default
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
+            try {
+                if (kafkaStreams1IsActive) {
+                    assertThat(store2.get(key), is(nullValue()));
+                } else {
+                    assertThat(store1.get(key), is(nullValue()));
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {
+                assertThat(
+                    exception.getMessage(),
+                    containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+                );
+                LOG.info("Streams wasn't running. Will try again.");
+                return false;

Review comment:
       This is the meat of this change. If we do get an exception, we can still verify the exception is the one we expected to get, and then we return `false` to indicate we should try again later to get a successful verification.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -102,10 +111,10 @@ public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                        Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(TABLE_NAME)
-                                .withCachingDisabled())
-                .toStream()
-                .peek((k, v) -> semaphore.release());
+                      Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(TABLE_NAME)
+                          .withCachingDisabled())
+               .toStream()
+               .peek((k, v) -> semaphore.release());

Review comment:
       I went ahead and fixed the whitespace also, since this PR is relatively small.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#discussion_r513939906



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -117,17 +126,34 @@ public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
-
-        final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        // Assert that only active is able to query for a key by default
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
-        assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), is(nullValue()));
+        until(() -> {
+
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
+
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            // Assert that only active is able to query for a key by default
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
+            try {
+                if (kafkaStreams1IsActive) {
+                    assertThat(store2.get(key), is(nullValue()));
+                } else {
+                    assertThat(store1.get(key), is(nullValue()));
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {

Review comment:
       Why not handling the ```InvalidStateStoreException``` in the helper method ```until```

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -153,51 +179,75 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception {
 
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        //key belongs to this partition
-        final int keyPartition = keyQueryMetadata.partition();
-
-        //key doesn't belongs to this partition
-        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
-            StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-                .withPartition(keyPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
-        if (kafkaStreams1IsActive) {
-            store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
-        } else {
-            store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
-        }
-
-        if (kafkaStreams1IsActive) {
-            assertThat(store1, is(notNullValue()));
-            assertThat(store2, is(nullValue()));
-        } else {
-            assertThat(store2, is(notNullValue()));
-            assertThat(store1, is(nullValue()));
-        }
+        until(() -> {
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            //key belongs to this partition
+            final int keyPartition = keyQueryMetadata.partition();
+
+            //key doesn't belongs to this partition
+            final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                    .withPartition(keyPartition);
+            ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+            ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+            if (kafkaStreams1IsActive) {
+                store1 = getStore(kafkaStreams1, storeQueryParam);
+            } else {
+                store2 = getStore(kafkaStreams2, storeQueryParam);
+            }
+
+            if (kafkaStreams1IsActive) {
+                assertThat(store1, is(notNullValue()));
+                assertThat(store2, is(nullValue()));
+            } else {
+                assertThat(store2, is(notNullValue()));
+                assertThat(store1, is(nullValue()));
+            }
+
+            // Assert that only active for a specific requested partition serves key if stale stores and not enabled
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                .withPartition(keyDontBelongPartition);
 
-        // Assert that only active for a specific requested partition serves key if stale stores and not enabled
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
 
-        storeQueryParam = StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-            .withPartition(keyDontBelongPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store3 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store4 = null;
-        if (!kafkaStreams1IsActive) {
-            store3 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
-        } else {
-            store4 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
-        }
 
-        // Assert that key is not served when wrong specific partition is requested
-        // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
-        // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
-        assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), is(nullValue()));
+            try {
+                // Assert that key is not served when wrong specific partition is requested
+                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
+                // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
+                if (kafkaStreams1IsActive) {
+                    assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store source-table does not exist.")
+                    );
+                } else {
+                    assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store source-table does not exist.")
+                    );
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {

Review comment:
       which method can throw ```InvalidStateStoreException``` in this case? It seems to me the potential methods are caught by ```assertThrows```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#issuecomment-718885978


   The tests passed, and I answered the two review questions. I'll go ahead and merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#discussion_r514325710



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -153,51 +179,75 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception {
 
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        //key belongs to this partition
-        final int keyPartition = keyQueryMetadata.partition();
-
-        //key doesn't belongs to this partition
-        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
-            StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-                .withPartition(keyPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
-        if (kafkaStreams1IsActive) {
-            store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
-        } else {
-            store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
-        }
-
-        if (kafkaStreams1IsActive) {
-            assertThat(store1, is(notNullValue()));
-            assertThat(store2, is(nullValue()));
-        } else {
-            assertThat(store2, is(notNullValue()));
-            assertThat(store1, is(nullValue()));
-        }
+        until(() -> {
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            //key belongs to this partition
+            final int keyPartition = keyQueryMetadata.partition();
+
+            //key doesn't belongs to this partition
+            final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                    .withPartition(keyPartition);
+            ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+            ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+            if (kafkaStreams1IsActive) {
+                store1 = getStore(kafkaStreams1, storeQueryParam);
+            } else {
+                store2 = getStore(kafkaStreams2, storeQueryParam);
+            }
+
+            if (kafkaStreams1IsActive) {
+                assertThat(store1, is(notNullValue()));
+                assertThat(store2, is(nullValue()));
+            } else {
+                assertThat(store2, is(notNullValue()));
+                assertThat(store1, is(nullValue()));
+            }
+
+            // Assert that only active for a specific requested partition serves key if stale stores and not enabled
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                .withPartition(keyDontBelongPartition);
 
-        // Assert that only active for a specific requested partition serves key if stale stores and not enabled
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
 
-        storeQueryParam = StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-            .withPartition(keyDontBelongPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store3 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store4 = null;
-        if (!kafkaStreams1IsActive) {
-            store3 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
-        } else {
-            store4 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
-        }
 
-        // Assert that key is not served when wrong specific partition is requested
-        // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
-        // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
-        assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), is(nullValue()));
+            try {
+                // Assert that key is not served when wrong specific partition is requested
+                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
+                // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
+                if (kafkaStreams1IsActive) {
+                    assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store source-table does not exist.")
+                    );
+                } else {
+                    assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store source-table does not exist.")
+                    );
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {

Review comment:
       It's a little subtle, but each block of the `if` has two verifications: that it can fetch data from one of the instances and that it gets an exception from the other instance. If something goes wrong the "can fetch data" verification could throw.
   
   E.g., `assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue()));` could throw.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org