You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/24 13:19:30 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12289: KAFKA-13957: Fix flaky shouldQuerySpecificActivePartitionStores test

cadonna commented on code in PR #12289:
URL: https://github.com/apache/kafka/pull/12289#discussion_r906002582


##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {
+                assertThat(store1.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key));
-                    assertThat(
+

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {
+                assertThat(store1.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key));
-                    assertThat(
+
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store source-table does not exist.")

Review Comment:
   ```suggestion
                   assertThat(
                       exception.getMessage(),
                       containsString("The specified partition 1 for store source-table does not exist.")
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {

Review Comment:
   I am wondering why we have three times the same if-statement `if (kafkaStreams1IsActive)`. Can we not merge the three if-statements together?



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -535,17 +532,26 @@ public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
         });
     }
 
+    private Matcher<String> retrievableException() {
+        return is(
+                anyOf(
+                        containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+                        containsString("The state store, source-table, may have migrated to another instance"),
+                        containsString("Cannot get state store source-table because the stream thread is STARTING, not RUNNING"),
+                        containsString("The specified partition 1 for store source-table does not exist.")
+                )
+        );
+    }

Review Comment:
   Fix for indentation and typo
   ```suggestion
       private Matcher<String> retriableException() {
           return is(
               anyOf(
                   containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
                   containsString("The state store, source-table, may have migrated to another instance"),
                   containsString("Cannot get state store source-table because the stream thread is STARTING, not RUNNING"),
                   containsString("The specified partition 1 for store source-table does not exist.")
               )
           );
       }
   ```
   Could you please fix the typo in the whole class. It should be `retriable`. The typo was mostly existing before this PR?
   Could you please also fix the indentation in the whole class? We use 4 spaces.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -210,39 +213,33 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception {
             }
 
             final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> storeQueryParam2 =
-                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
-                .withPartition(keyDontBelongPartition);
-
-            try {
-                // Assert that key is not served when wrong specific partition is requested
-                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
-                // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
-                if (kafkaStreams1IsActive) {
-                    assertThat(store1.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                    StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                            .withPartition(keyDontBelongPartition);
+            // Assert that key is not served when wrong specific partition is requested
+            // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
+            // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
+            if (kafkaStreams1IsActive) {
+                assertThat(store1.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key));
-                    assertThat(
+
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store source-table does not exist.")
-                    );
-                } else {
-                    assertThat(store2.get(key), is(notNullValue()));
-                    assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue()));
-                    final InvalidStateStoreException exception =
+                );
+            } else {
+                assertThat(store2.get(key), is(notNullValue()));
+                assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue()));
+                final InvalidStateStoreException exception =
                         assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key));
-                    assertThat(
+                assertThat(
                         exception.getMessage(),
                         containsString("The specified partition 1 for store source-table does not exist.")
-                    );
-                }
-                return true;
-            } catch (final InvalidStateStoreException exception) {
-                verifyRetrievableException(exception);
-                LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
-                return false;
+                );
             }
-        });
+            return true;

Review Comment:
   Why do we still need to return something? With your change we would retry until we do not encounter any exception anymore. 
   Maybe it would be better to extend `retryOnExceptionWithTimeout()` (see https://github.com/apache/kafka/blob/e4b3a3cdeb295fdd4c4434ec1a7ee77b66553ae0/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L361) to include the exception for which to retry.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java:
##########
@@ -535,17 +532,26 @@ public void shouldQueryStoresAfterAddingAndRemovingStreamThread() throws Excepti
         });
     }
 
+    private Matcher<String> retrievableException() {
+        return is(
+                anyOf(
+                        containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+                        containsString("The state store, source-table, may have migrated to another instance"),
+                        containsString("Cannot get state store source-table because the stream thread is STARTING, not RUNNING"),
+                        containsString("The specified partition 1 for store source-table does not exist.")

Review Comment:
   I see that this might happen, but I am wondering if we can solve that differently. The reason is that a bug could also throw that exception and we would just retry. Due to exceeding the timeout we would probably understand that it is a bug, but it seems really hard to investigate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org