You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/29 04:03:25 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

chia7712 commented on a change in pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#discussion_r513939906



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -117,17 +126,34 @@ public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = QueryableStoreTypes.keyValueStore();
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
-
-        final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        // Assert that only active is able to query for a key by default
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
-        assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), is(nullValue()));
+        until(() -> {
+
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
+
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            // Assert that only active is able to query for a key by default
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
+            try {
+                if (kafkaStreams1IsActive) {
+                    assertThat(store2.get(key), is(nullValue()));
+                } else {
+                    assertThat(store1.get(key), is(nullValue()));
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {

Review comment:
       Why not handling the ```InvalidStateStoreException``` in the helper method ```until```

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -153,51 +179,75 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception {
 
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        //key belongs to this partition
-        final int keyPartition = keyQueryMetadata.partition();
-
-        //key doesn't belongs to this partition
-        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
-            StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-                .withPartition(keyPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
-        if (kafkaStreams1IsActive) {
-            store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
-        } else {
-            store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
-        }
-
-        if (kafkaStreams1IsActive) {
-            assertThat(store1, is(notNullValue()));
-            assertThat(store2, is(nullValue()));
-        } else {
-            assertThat(store2, is(notNullValue()));
-            assertThat(store1, is(nullValue()));
-        }
+        until(() -> {
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            //key belongs to this partition
+            final int keyPartition = keyQueryMetadata.partition();
+
+            //key doesn't belongs to this partition
+            final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                    .withPartition(keyPartition);
+            ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+            ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+            if (kafkaStreams1IsActive) {
+                store1 = getStore(kafkaStreams1, storeQueryParam);
+            } else {
+                store2 = getStore(kafkaStreams2, storeQueryParam);
+            }
+
+            if (kafkaStreams1IsActive) {
+                assertThat(store1, is(notNullValue()));
+                assertThat(store2, is(nullValue()));
+            } else {
+                assertThat(store2, is(notNullValue()));
+                assertThat(store1, is(nullValue()));
+            }
+
+            // Assert that only active for a specific requested partition serves key if stale stores and not enabled
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                .withPartition(keyDontBelongPartition);
 
-        // Assert that only active for a specific requested partition serves key if stale stores and not enabled
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue()));
 
-        storeQueryParam = StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-            .withPartition(keyDontBelongPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store3 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store4 = null;
-        if (!kafkaStreams1IsActive) {
-            store3 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam);
-        } else {
-            store4 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam);
-        }
 
-        // Assert that key is not served when wrong specific partition is requested
-        // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
-        // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
-        assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), is(nullValue()));
+            try {
+                // Assert that key is not served when wrong specific partition is requested
+                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
+                // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
+                if (kafkaStreams1IsActive) {
+                    assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store source-table does not exist.")
+                    );
+                } else {
+                    assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store source-table does not exist.")
+                    );
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {

Review comment:
       which method can throw ```InvalidStateStoreException``` in this case? It seems to me the potential methods are caught by ```assertThrows```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org