You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/05/20 14:17:59 UTC

[kafka] branch trunk updated: MINOR: Deflake OptimizedKTableIntegrationTest (#12186)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3f86a183be MINOR: Deflake OptimizedKTableIntegrationTest (#12186)
3f86a183be is described below

commit 3f86a183bea5902785b4ddc6934030160d99095a
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri May 20 09:17:39 2022 -0500

    MINOR: Deflake OptimizedKTableIntegrationTest (#12186)
    
    This test has been flaky due to unexpected rebalances during the test.
    This change fixes it by detecting an unexpected rebalance and retrying
    the test logic (within a timeout).
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Guozhang Wang <gu...@apache.org>
---
 .../OptimizedKTableIntegrationTest.java            | 60 +++++++++++++---------
 1 file changed, 36 insertions(+), 24 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index f9ab66cbd9..f6f045dea9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -39,10 +40,11 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyQueryMetadata;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KeyQueryMetadata;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -51,6 +53,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.NoRetryException;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -60,9 +63,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Category(IntegrationTest.class)
 public class OptimizedKTableIntegrationTest {
+    private static final Logger LOG = LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class);
     private static final int NUM_BROKERS = 1;
     private static int port = 0;
     private static final String INPUT_TOPIC_NAME = "input-topic";
@@ -125,31 +131,37 @@ public class OptimizedKTableIntegrationTest {
         // Assert that all messages in the first batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
 
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
-
-        final boolean kafkaStreams1WasFirstActive;
-        final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
-        // Assert that the current value in store reflects all messages being processed
-        if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
-            assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = true;
-        } else {
-            assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
-            kafkaStreams1WasFirstActive = false;
-        }
-
-        if (kafkaStreams1WasFirstActive) {
-            kafkaStreams1.close();
-        } else {
-            kafkaStreams2.close();
-        }
+        final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>> newActiveStore = new AtomicReference<>(null);
+        TestUtils.retryOnExceptionWithTimeout(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
+
+            final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+            try {
+                // Assert that the current value in store reflects all messages being processed
+                if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
+                    assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
+                    kafkaStreams1.close();
+                    newActiveStore.set(store2);
+                } else {
+                    assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
+                    kafkaStreams2.close();
+                    newActiveStore.set(store1);
+                }
+            } catch (final InvalidStateStoreException e) {
+                LOG.warn("Detected an unexpected rebalance during test. Retrying if possible.", e);
+                throw e;
+            } catch (final Throwable t) {
+                LOG.error("Caught non-retriable exception in test. Exiting.", t);
+                throw new NoRetryException(t);
+            }
+        });
 
-        final ReadOnlyKeyValueStore<Integer, Integer> newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1;
+        // Wait for failover
         TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
             // Assert that after failover we have recovered to the last store write
-            assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1)));
+            assertThat(newActiveStore.get().get(key), is(equalTo(batch1NumMessages - 1)));
         });
 
         final int totalNumMessages = batch1NumMessages + batch2NumMessages;
@@ -161,7 +173,7 @@ public class OptimizedKTableIntegrationTest {
 
         TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
             // Assert that the current value in store reflects all messages being processed
-            assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
+            assertThat(newActiveStore.get().get(key), is(equalTo(totalNumMessages - 1)));
         });
     }