You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/19 22:18:43 UTC

[GitHub] [kafka] vvcephei commented on a diff in pull request #12186: MINOR: Deflake OptimizedKTableIntegrationTest

vvcephei commented on code in PR #12186:
URL: https://github.com/apache/kafka/pull/12186#discussion_r877582344


##########
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##########
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
 
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
-
-        final boolean kafkaStreams1WasFirstActive;
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        // Assert that the current value in store reflects all messages being processed
-        if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
-            assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = true;
-        } else {
-            assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = false;
-        }
-
-        if (kafkaStreams1WasFirstActive) {
-            kafkaStreams1.close();
-        } else {
-            kafkaStreams2.close();
-        }
+        final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>> newActiveStore = new AtomicReference<>(null);
+        TestUtils.retryOnExceptionWithTimeout(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
+
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            try {
+                // Assert that the current value in store reflects all messages being processed
+                if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
+                    assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
+                    kafkaStreams1.close();
+                    newActiveStore.set(store2);
+                } else {
+                    assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
+                    kafkaStreams2.close();
+                    newActiveStore.set(store1);
+                }
+            } catch (final InvalidStateStoreException e) {
+                LOG.warn("Detected an unexpected rebalance during test. Retrying if possible.", e);
+                throw e;

Review Comment:
   This triggers the retryOnException logic



##########
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##########
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
 
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
-
-        final boolean kafkaStreams1WasFirstActive;
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        // Assert that the current value in store reflects all messages being processed
-        if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
-            assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = true;
-        } else {
-            assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = false;
-        }
-
-        if (kafkaStreams1WasFirstActive) {
-            kafkaStreams1.close();
-        } else {
-            kafkaStreams2.close();
-        }
+        final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>> newActiveStore = new AtomicReference<>(null);
+        TestUtils.retryOnExceptionWithTimeout(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());

Review Comment:
   Depending on where and when the rebalance happens, we might need to re-resolve the stores, so I just included it in the retry block for resilience.



##########
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##########
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
 
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
-
-        final boolean kafkaStreams1WasFirstActive;
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        // Assert that the current value in store reflects all messages being processed
-        if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
-            assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = true;
-        } else {
-            assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = false;
-        }
-
-        if (kafkaStreams1WasFirstActive) {
-            kafkaStreams1.close();
-        } else {
-            kafkaStreams2.close();
-        }
+        final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>> newActiveStore = new AtomicReference<>(null);
+        TestUtils.retryOnExceptionWithTimeout(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
+
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);

Review Comment:
   This is essential to the retry. After the rebalance, we must re-resolve which host is now active.



##########
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##########
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
 
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
-
-        final boolean kafkaStreams1WasFirstActive;
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        // Assert that the current value in store reflects all messages being processed
-        if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
-            assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = true;
-        } else {
-            assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = false;
-        }
-
-        if (kafkaStreams1WasFirstActive) {
-            kafkaStreams1.close();
-        } else {
-            kafkaStreams2.close();
-        }
+        final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>> newActiveStore = new AtomicReference<>(null);
+        TestUtils.retryOnExceptionWithTimeout(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
+
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            try {
+                // Assert that the current value in store reflects all messages being processed
+                if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
+                    assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
+                    kafkaStreams1.close();
+                    newActiveStore.set(store2);
+                } else {
+                    assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
+                    kafkaStreams2.close();
+                    newActiveStore.set(store1);
+                }
+            } catch (final InvalidStateStoreException e) {
+                LOG.warn("Detected an unexpected rebalance during test. Retrying if possible.", e);
+                throw e;
+            } catch (final Throwable t) {
+                LOG.error("Caught non-retriable exception in test. Exiting.", t);
+                throw new NoRetryException(t);
+            }

Review Comment:
   I didn't want to make the test retry unexpected exceptions, since that might make it miss a bug, so all other exceptions will cause the test to fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org