You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/09/19 17:27:25 UTC
[kafka] branch trunk updated: KAFKA-10199: Adapt restoration integration tests to state updater (#12650)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4fa3496e1 KAFKA-10199: Adapt restoration integration tests to state updater (#12650)
b4fa3496e1 is described below
commit b4fa3496e19471aa083251337fa32ec811d10079
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Mon Sep 19 19:27:17 2022 +0200
KAFKA-10199: Adapt restoration integration tests to state updater (#12650)
Transforms the integration test that verifies restoration in a
parametrized test. The parametrized test runs once with
state updater enabled and once with state updater disabled.
Reviewer: Guozhang Wang <wa...@gmail.com>
---
.../integration/RestoreIntegrationTest.java | 51 ++++++++++++++--------
.../integration/utils/IntegrationTestUtils.java | 9 ++--
2 files changed, 39 insertions(+), 21 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index d95468c080..4048868486 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -60,9 +61,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
@@ -74,6 +76,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -117,7 +120,7 @@ public class RestoreIntegrationTest {
CLUSTER.createTopic(inputStream, 2, 1);
}
- private Properties props() {
+ private Properties props(final boolean stateUpdaterEnabled) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -127,6 +130,7 @@ public class RestoreIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
return streamsConfiguration;
}
@@ -137,12 +141,19 @@ public class RestoreIntegrationTest {
}
}
- @Test
- public void shouldRestoreStateFromSourceTopic() throws Exception {
+ private static Stream<Boolean> parameters() {
+ return Stream.of(
+ Boolean.TRUE,
+ Boolean.FALSE);
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldRestoreStateFromSourceTopic(final boolean stateUpdaterEnabled) throws Exception {
final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder();
- final Properties props = props();
+ final Properties props = props(stateUpdaterEnabled);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
// restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
@@ -202,15 +213,16 @@ public class RestoreIntegrationTest {
assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2));
}
- @Test
- public void shouldRestoreStateFromChangelogTopic() throws Exception {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabled) throws Exception {
final String changelog = appId + "-store-changelog";
CLUSTER.createTopic(changelog, 2, 1);
final AtomicInteger numReceived = new AtomicInteger(0);
final StreamsBuilder builder = new StreamsBuilder();
- final Properties props = props();
+ final Properties props = props(stateUpdaterEnabled);
// restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the two partitions
final int offsetCheckpointed = 1000;
@@ -268,8 +280,9 @@ public class RestoreIntegrationTest {
assertThat(numReceived.get(), equalTo(numberOfKeys));
}
- @Test
- public void shouldSuccessfullyStartWhenLoggingDisabled() throws InterruptedException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdaterEnabled) throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, Integer> stream = builder.stream(inputStream);
@@ -279,7 +292,7 @@ public class RestoreIntegrationTest {
Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
final CountDownLatch startupLatch = new CountDownLatch(1);
- kafkaStreams = new KafkaStreams(builder.build(), props());
+ kafkaStreams = new KafkaStreams(builder.build(), props(stateUpdaterEnabled));
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
startupLatch.countDown();
@@ -291,8 +304,9 @@ public class RestoreIntegrationTest {
assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
}
- @Test
- public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUpdaterEnabled) throws InterruptedException {
IntegrationTestUtils.produceKeyValuesSynchronously(inputStream,
asList(KeyValue.pair(1, 1),
KeyValue.pair(2, 2),
@@ -320,7 +334,7 @@ public class RestoreIntegrationTest {
final Topology topology = streamsBuilder.build();
- kafkaStreams = new KafkaStreams(topology, props());
+ kafkaStreams = new KafkaStreams(topology, props(stateUpdaterEnabled));
final CountDownLatch latch = new CountDownLatch(1);
kafkaStreams.setStateListener((newState, oldState) -> {
@@ -335,8 +349,9 @@ public class RestoreIntegrationTest {
assertTrue(processorLatch.await(30, TimeUnit.SECONDS));
}
- @Test
- public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore() throws Exception {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean stateUpdaterEnabled) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(
inputStream,
@@ -344,13 +359,13 @@ public class RestoreIntegrationTest {
);
createStateForRestoration(inputStream, 0);
- final Properties props1 = props();
+ final Properties props1 = props(stateUpdaterEnabled);
props1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props1.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-1").getPath());
purgeLocalStreamsState(props1);
final KafkaStreams client1 = new KafkaStreams(builder.build(), props1);
- final Properties props2 = props();
+ final Properties props2 = props(stateUpdaterEnabled);
props2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
props2.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-2").getPath());
purgeLocalStreamsState(props2);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 689b1c0beb..c14988cdae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -239,11 +239,14 @@ public class IntegrationTestUtils {
* Used by tests migrated to JUnit 5.
*/
public static String safeUniqueTestName(final Class<?> testClass, final TestInfo testInfo) {
- return safeUniqueTestName(testClass, testInfo.getTestMethod().map(Method::getName).orElse(""));
+ final String displayName = testInfo.getDisplayName();
+ final String methodName = testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName");
+ final String testName = displayName.contains(methodName) ? methodName : methodName + displayName;
+ return safeUniqueTestName(testClass, testName);
}
- private static String safeUniqueTestName(final Class<?> testClass, final String methodName) {
- return (testClass.getSimpleName() + methodName)
+ private static String safeUniqueTestName(final Class<?> testClass, final String testName) {
+ return (testClass.getSimpleName() + testName)
.replace(':', '_')
.replace('.', '_')
.replace('[', '_')