You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/05/20 14:17:59 UTC
[kafka] branch trunk updated: MINOR: Deflake OptimizedKTableIntegrationTest (#12186)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f86a183be MINOR: Deflake OptimizedKTableIntegrationTest (#12186)
3f86a183be is described below
commit 3f86a183bea5902785b4ddc6934030160d99095a
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri May 20 09:17:39 2022 -0500
MINOR: Deflake OptimizedKTableIntegrationTest (#12186)
This test has been flaky due to unexpected rebalances during the test.
This change fixes it by detecting an unexpected rebalance and retrying
the test logic (within a timeout).
Reviewers: Matthias J. Sax <mj...@apache.org>, Guozhang Wang <gu...@apache.org>
---
.../OptimizedKTableIntegrationTest.java | 60 +++++++++++++---------
1 file changed, 36 insertions(+), 24 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index f9ab66cbd9..f6f045dea9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -39,10 +40,11 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KeyQueryMetadata;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
@@ -51,6 +53,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
@@ -60,9 +63,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Category(IntegrationTest.class)
public class OptimizedKTableIntegrationTest {
+ private static final Logger LOG = LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static int port = 0;
private static final String INPUT_TOPIC_NAME = "input-topic";
@@ -125,31 +131,37 @@ public class OptimizedKTableIntegrationTest {
// Assert that all messages in the first batch were processed in a timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
- final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
- final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
-
- final boolean kafkaStreams1WasFirstActive;
- final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
-
- // Assert that the current value in store reflects all messages being processed
- if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
- assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = true;
- } else {
- assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = false;
- }
-
- if (kafkaStreams1WasFirstActive) {
- kafkaStreams1.close();
- } else {
- kafkaStreams2.close();
- }
+ final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>> newActiveStore = new AtomicReference<>(null);
+ TestUtils.retryOnExceptionWithTimeout(() -> {
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());
+
+ final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
+
+ try {
+ // Assert that the current value in store reflects all messages being processed
+ if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
+ assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
+ kafkaStreams1.close();
+ newActiveStore.set(store2);
+ } else {
+ assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
+ kafkaStreams2.close();
+ newActiveStore.set(store1);
+ }
+ } catch (final InvalidStateStoreException e) {
+ LOG.warn("Detected an unexpected rebalance during test. Retrying if possible.", e);
+ throw e;
+ } catch (final Throwable t) {
+ LOG.error("Caught non-retriable exception in test. Exiting.", t);
+ throw new NoRetryException(t);
+ }
+ });
- final ReadOnlyKeyValueStore<Integer, Integer> newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1;
+ // Wait for failover
TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that after failover we have recovered to the last store write
- assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1)));
+ assertThat(newActiveStore.get().get(key), is(equalTo(batch1NumMessages - 1)));
});
final int totalNumMessages = batch1NumMessages + batch2NumMessages;
@@ -161,7 +173,7 @@ public class OptimizedKTableIntegrationTest {
TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that the current value in store reflects all messages being processed
- assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
+ assertThat(newActiveStore.get().get(key), is(equalTo(totalNumMessages - 1)));
});
}