You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/09/19 17:27:25 UTC

[kafka] branch trunk updated: KAFKA-10199: Adapt restoration integration tests to state updater (#12650)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4fa3496e1 KAFKA-10199: Adapt restoration integration tests to state updater (#12650)
b4fa3496e1 is described below

commit b4fa3496e19471aa083251337fa32ec811d10079
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Mon Sep 19 19:27:17 2022 +0200

    KAFKA-10199: Adapt restoration integration tests to state updater (#12650)
    
    Transforms the integration test that verifies restoration in a
    parametrized test. The parametrized test runs once with
    state updater enabled and once with state updater disabled.
    
    Reviewer: Guozhang Wang <wa...@gmail.com>
---
 .../integration/RestoreIntegrationTest.java        | 51 ++++++++++++++--------
 .../integration/utils/IntegrationTestUtils.java    |  9 ++--
 2 files changed, 39 insertions(+), 21 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index d95468c080..4048868486 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -60,9 +61,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -74,6 +76,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -117,7 +120,7 @@ public class RestoreIntegrationTest {
         CLUSTER.createTopic(inputStream, 2, 1);
     }
 
-    private Properties props() {
+    private Properties props(final boolean stateUpdaterEnabled) {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -127,6 +130,7 @@ public class RestoreIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
         return streamsConfiguration;
     }
 
@@ -137,12 +141,19 @@ public class RestoreIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldRestoreStateFromSourceTopic() throws Exception {
+    private static Stream<Boolean> parameters() {
+        return Stream.of(
+            Boolean.TRUE,
+            Boolean.FALSE);
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldRestoreStateFromSourceTopic(final boolean stateUpdaterEnabled) throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final Properties props = props();
+        final Properties props = props(stateUpdaterEnabled);
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
 
         // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
@@ -202,15 +213,16 @@ public class RestoreIntegrationTest {
         assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2));
     }
 
-    @Test
-    public void shouldRestoreStateFromChangelogTopic() throws Exception {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabled) throws Exception {
         final String changelog = appId + "-store-changelog";
         CLUSTER.createTopic(changelog, 2, 1);
 
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final Properties props = props();
+        final Properties props = props(stateUpdaterEnabled);
 
         // restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the two partitions
         final int offsetCheckpointed = 1000;
@@ -268,8 +280,9 @@ public class RestoreIntegrationTest {
         assertThat(numReceived.get(), equalTo(numberOfKeys));
     }
 
-    @Test
-    public void shouldSuccessfullyStartWhenLoggingDisabled() throws InterruptedException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdaterEnabled) throws InterruptedException {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Integer, Integer> stream = builder.stream(inputStream);
@@ -279,7 +292,7 @@ public class RestoreIntegrationTest {
                     Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
-        kafkaStreams = new KafkaStreams(builder.build(), props());
+        kafkaStreams = new KafkaStreams(builder.build(), props(stateUpdaterEnabled));
         kafkaStreams.setStateListener((newState, oldState) -> {
             if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                 startupLatch.countDown();
@@ -291,8 +304,9 @@ public class RestoreIntegrationTest {
         assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
     }
 
-    @Test
-    public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUpdaterEnabled) throws InterruptedException {
         IntegrationTestUtils.produceKeyValuesSynchronously(inputStream,
                                                            asList(KeyValue.pair(1, 1),
                                                                          KeyValue.pair(2, 2),
@@ -320,7 +334,7 @@ public class RestoreIntegrationTest {
 
         final Topology topology = streamsBuilder.build();
 
-        kafkaStreams = new KafkaStreams(topology, props());
+        kafkaStreams = new KafkaStreams(topology, props(stateUpdaterEnabled));
 
         final CountDownLatch latch = new CountDownLatch(1);
         kafkaStreams.setStateListener((newState, oldState) -> {
@@ -335,8 +349,9 @@ public class RestoreIntegrationTest {
         assertTrue(processorLatch.await(30, TimeUnit.SECONDS));
     }
 
-    @Test
-    public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore() throws Exception {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean stateUpdaterEnabled) throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(
                 inputStream,
@@ -344,13 +359,13 @@ public class RestoreIntegrationTest {
         );
         createStateForRestoration(inputStream, 0);
 
-        final Properties props1 = props();
+        final Properties props1 = props(stateUpdaterEnabled);
         props1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         props1.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-1").getPath());
         purgeLocalStreamsState(props1);
         final KafkaStreams client1 = new KafkaStreams(builder.build(), props1);
 
-        final Properties props2 = props();
+        final Properties props2 = props(stateUpdaterEnabled);
         props2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         props2.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-2").getPath());
         purgeLocalStreamsState(props2);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 689b1c0beb..c14988cdae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -239,11 +239,14 @@ public class IntegrationTestUtils {
      * Used by tests migrated to JUnit 5.
      */
     public static String safeUniqueTestName(final Class<?> testClass, final TestInfo testInfo) {
-        return safeUniqueTestName(testClass, testInfo.getTestMethod().map(Method::getName).orElse(""));
+        final String displayName = testInfo.getDisplayName();
+        final String methodName = testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName");
+        final String testName = displayName.contains(methodName) ? methodName : methodName + displayName;
+        return safeUniqueTestName(testClass, testName);
     }
 
-    private static String safeUniqueTestName(final Class<?> testClass, final String methodName) {
-        return (testClass.getSimpleName() + methodName)
+    private static String safeUniqueTestName(final Class<?> testClass, final String testName) {
+        return (testClass.getSimpleName() + testName)
                 .replace(':', '_')
                 .replace('.', '_')
                 .replace('[', '_')