You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/07/21 15:28:01 UTC
[kafka] branch trunk updated: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1 (#12285)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 569a358a3f KAFKA-14001: Migrate streams module to JUnit 5 - Part 1 (#12285)
569a358a3f is described below
commit 569a358a3f9c4d2aa7aaeb277f691ad0bae5db34
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Thu Jul 21 16:27:53 2022 +0100
KAFKA-14001: Migrate streams module to JUnit 5 - Part 1 (#12285)
This pull request addresses https://issues.apache.org/jira/browse/KAFKA-14001. It is the first of a series of pull requests which address the move of Kafka Streams tests from JUnit 4 to JUnit 5.
Reviewers: Divij Vaidya <di...@amazon.com>, Bruno Cadonna <ca...@apache.org>
---
.../integration/AdjustStreamThreadCountTest.java | 47 ++++-----
.../integration/EmitOnChangeIntegrationTest.java | 30 +++---
.../FineGrainedAutoResetIntegrationTest.java | 24 +++--
.../integration/GlobalKTableIntegrationTest.java | 44 ++++-----
.../integration/GlobalThreadShutDownOrderTest.java | 37 +++----
...ighAvailabilityTaskAssignorIntegrationTest.java | 36 +++----
.../streams/integration/IQv2IntegrationTest.java | 43 ++++----
.../integration/InternalTopicIntegrationTest.java | 31 +++---
.../KStreamAggregationDedupIntegrationTest.java | 62 ++++++------
.../KStreamAggregationIntegrationTest.java | 109 +++++++++++----------
...yInnerJoinCustomPartitionerIntegrationTest.java | 30 +++---
.../KTableSourceTopicRestartIntegrationTest.java | 38 ++++---
.../integration/RackAwarenessIntegrationTest.java | 43 ++++----
.../integration/RestoreIntegrationTest.java | 36 +++----
.../integration/utils/IntegrationTestUtils.java | 18 +++-
15 files changed, 296 insertions(+), 332 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index c8c30f6837..bf64068b94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -34,17 +34,16 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -69,33 +68,27 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+@Timeout(600)
@Category(IntegrationTest.class)
public class AdjustStreamThreadCountTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
-
- @Rule
- public TestName testName = new TestName();
-
private final List<KafkaStreams.State> stateTransitionHistory = new ArrayList<>();
private static String inputTopic;
private static StreamsBuilder builder;
@@ -103,9 +96,9 @@ public class AdjustStreamThreadCountTest {
private static String appId = "";
public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
- @Before
- public void setup() {
- final String testId = safeUniqueTestName(getClass(), testName);
+ @BeforeEach
+ public void setup(final TestInfo testInfo) {
+ final String testId = safeUniqueTestName(getClass(), testInfo);
appId = "appId_" + testId;
inputTopic = "input" + testId;
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
@@ -131,7 +124,7 @@ public class AdjustStreamThreadCountTest {
waitForRunning();
}
- @After
+ @AfterEach
public void teardown() throws IOException {
purgeLocalStreamsState(properties);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index f41c95a6bb..c7a545068f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -33,14 +33,13 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Arrays;
@@ -52,35 +51,30 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+@Timeout(600)
@Category(IntegrationTest.class)
public class EmitOnChangeIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
-
private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
- @Rule
- public TestName testName = new TestName();
-
private static String inputTopic;
private static String inputTopic2;
private static String outputTopic;
private static String outputTopic2;
private static String appId = "";
- @Before
- public void setup() {
- final String testId = safeUniqueTestName(getClass(), testName);
+ @BeforeEach
+ public void setup(final TestInfo testInfo) {
+ final String testId = safeUniqueTestName(getClass(), testInfo);
appId = "appId_" + testId;
inputTopic = "input" + testId;
inputTopic2 = "input2" + testId;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 05f2359efc..0d439a32ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -41,13 +41,12 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.ArrayList;
@@ -62,12 +61,11 @@ import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.fail;
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
public class FineGrainedAutoResetIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
@@ -76,7 +74,7 @@ public class FineGrainedAutoResetIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopics(
@@ -105,7 +103,7 @@ public class FineGrainedAutoResetIntegrationTest {
OUTPUT_TOPIC_2);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -143,7 +141,7 @@ public class FineGrainedAutoResetIntegrationTest {
private final String topicYTestMessage = "topic-Y test";
private final String topicZTestMessage = "topic-Z test";
- @Before
+ @BeforeEach
public void setUp() throws IOException {
final Properties props = new Properties();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index d2c92a761d..c55dbfb275 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -44,15 +44,14 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -66,23 +65,21 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
public class GlobalKTableIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
-
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -101,15 +98,12 @@ public class GlobalKTableIntegrationTest {
private KStream<String, Long> stream;
private MockApiProcessorSupplier<String, String, Void, Void> supplier;
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void before() throws Exception {
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws Exception {
builder = new StreamsBuilder();
- createTopics();
+ createTopics(testInfo);
streamsConfiguration = new Properties();
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -125,7 +119,7 @@ public class GlobalKTableIntegrationTest {
supplier = new MockApiProcessorSupplier<>();
}
- @After
+ @AfterEach
public void whenShuttingDown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
@@ -348,8 +342,8 @@ public class GlobalKTableIntegrationTest {
kafkaStreams.close();
}
- private void createTopics() throws Exception {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ private void createTopics(final TestInfo testInfo) throws Exception {
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamTopic = "stream-" + safeTestName;
globalTableTopic = "globalTable-" + safeTestName;
CLUSTER.createTopics(streamTopic);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 6f4bd53427..fcf5bb1e6c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -39,15 +39,14 @@ import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -58,7 +57,7 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -70,10 +69,9 @@ import static org.junit.Assert.assertEquals;
* Otherwise if the GlobalStreamThread were to close underneath the StreamThread
* an exception would be thrown as the GlobalStreamThread closes all global stores on closing.
*/
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
public class GlobalThreadShutDownOrderTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
@@ -87,12 +85,12 @@ public class GlobalThreadShutDownOrderTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -108,15 +106,12 @@ public class GlobalThreadShutDownOrderTest {
private final List<Long> retrievedValuesList = new ArrayList<>();
private boolean firstRecordProcessed;
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void before() throws Exception {
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws Exception {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -145,7 +140,7 @@ public class GlobalThreadShutDownOrderTest {
}
- @After
+ @AfterEach
public void after() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index d712dfa3fc..44ca7d461d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -43,13 +43,12 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Collection;
@@ -73,43 +72,38 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+@Timeout(600)
@Category(IntegrationTest.class)
public class HighAvailabilityTaskAssignorIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
-
- @Rule
- public TestName testName = new TestName();
-
@Test
- public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException {
+ public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
- shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)));
+ shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo);
}
@Test
- public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws InterruptedException {
+ public void shouldScaleOutWithWarmupTasksAndPersistentStores(final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
- shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)));
+ shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo);
}
- private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction) throws InterruptedException {
- final String testId = safeUniqueTestName(getClass(), testName);
+ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction,
+ final TestInfo testInfo) throws InterruptedException {
+ final String testId = safeUniqueTestName(getClass(), testInfo);
final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
final String inputTopic = "input" + testId;
final Set<TopicPartition> inputTopicPartitions = mkSet(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 5d31398e38..e223b7e43c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -55,15 +55,14 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -88,10 +87,9 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.matchesPattern;
import static org.junit.jupiter.api.Assertions.assertThrows;
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
public class IQv2IntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
private static int port = 0;
@@ -103,10 +101,7 @@ public class IQv2IntegrationTest {
private KafkaStreams kafkaStreams;
- @Rule
- public TestName testName = new TestName();
-
- @BeforeClass
+ @BeforeAll
public static void before()
throws InterruptedException, IOException, ExecutionException, TimeoutException {
CLUSTER.start();
@@ -155,8 +150,8 @@ public class IQv2IntegrationTest {
));
}
- @Before
- public void beforeTest() {
+ @BeforeEach
+ public void beforeTest(final TestInfo testInfo) {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(
@@ -165,17 +160,17 @@ public class IQv2IntegrationTest {
Materialized.as(STORE_NAME)
);
- kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration());
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(testInfo));
kafkaStreams.cleanUp();
}
- @After
+ @AfterEach
public void afterTest() {
kafkaStreams.close();
kafkaStreams.cleanUp();
}
- @AfterClass
+ @AfterAll
public static void after() {
CLUSTER.stop();
}
@@ -295,7 +290,7 @@ public class IQv2IntegrationTest {
}
@Test
- public void shouldNotRequireQueryHandler() {
+ public void shouldNotRequireQueryHandler(final TestInfo testInfo) {
final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
final int partition = 1;
final Set<Integer> partitions = singleton(partition);
@@ -424,7 +419,7 @@ public class IQv2IntegrationTest {
})
);
- kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration());
+ kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(testInfo));
kafkaStreams.cleanUp();
kafkaStreams.start();
@@ -442,8 +437,8 @@ public class IQv2IntegrationTest {
}
- private Properties streamsConfiguration() {
- final String safeTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+ private Properties streamsConfiguration(final TestInfo testInfo) {
+ final String safeTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo);
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 98d6c7c4bd..f5db920a46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -44,14 +44,13 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Arrays;
@@ -67,27 +66,25 @@ import static java.time.Duration.ofSeconds;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests related to internal topics in streams
*/
@SuppressWarnings("deprecation")
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
public class InternalTopicIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -101,7 +98,7 @@ public class InternalTopicIntegrationTest {
private Properties streamsProp;
- @Before
+ @BeforeEach
public void before() {
streamsProp = new Properties();
streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -113,7 +110,7 @@ public class InternalTopicIntegrationTest {
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
- @After
+ @AfterEach
public void after() throws IOException {
// Remove any state from previous test runs
IntegrationTestUtils.purgeLocalStreamsState(streamsProp);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index f7a32934b4..1a899d6a5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -43,15 +43,14 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Arrays;
@@ -65,23 +64,21 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
* Similar to KStreamAggregationIntegrationTest but with dedupping enabled
* by virtue of having a large commit interval
*/
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
@SuppressWarnings("deprecation")
public class KStreamAggregationDedupIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
-
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -97,15 +94,12 @@ public class KStreamAggregationDedupIntegrationTest {
private Reducer<String> reducer;
private KStream<Integer, String> stream;
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void before() throws InterruptedException {
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws InterruptedException {
builder = new StreamsBuilder();
- createTopics();
+ createTopics(testInfo);
streamsConfiguration = new Properties();
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -120,7 +114,7 @@ public class KStreamAggregationDedupIntegrationTest {
reducer = (value1, value2) -> value1 + ":" + value2;
}
- @After
+ @AfterEach
public void whenShuttingDown() throws IOException {
if (kafkaStreams != null) {
kafkaStreams.close();
@@ -130,7 +124,7 @@ public class KStreamAggregationDedupIntegrationTest {
@Test
- public void shouldReduce() throws Exception {
+ public void shouldReduce(final TestInfo testInfo) throws Exception {
produceMessages(System.currentTimeMillis());
groupedStream
.reduce(reducer, Materialized.as("reduce-by-key"))
@@ -150,11 +144,12 @@ public class KStreamAggregationDedupIntegrationTest {
new KeyValueTimestamp<>("B", "B:B", timestamp),
new KeyValueTimestamp<>("C", "C:C", timestamp),
new KeyValueTimestamp<>("D", "D:D", timestamp),
- new KeyValueTimestamp<>("E", "E:E", timestamp)));
+ new KeyValueTimestamp<>("E", "E:E", timestamp)),
+ testInfo);
}
@Test
- public void shouldReduceWindowed() throws Exception {
+ public void shouldReduceWindowed(final TestInfo testInfo) throws Exception {
final long firstBatchTimestamp = System.currentTimeMillis() - 1000;
produceMessages(firstBatchTimestamp);
final long secondBatchTimestamp = System.currentTimeMillis();
@@ -186,12 +181,13 @@ public class KStreamAggregationDedupIntegrationTest {
new KeyValueTimestamp<>("D@" + secondBatchWindow, "D:D", secondBatchTimestamp),
new KeyValueTimestamp<>("E@" + firstBatchWindow, "E", firstBatchTimestamp),
new KeyValueTimestamp<>("E@" + secondBatchWindow, "E:E", secondBatchTimestamp)
- )
+ ),
+ testInfo
);
}
@Test
- public void shouldGroupByKey() throws Exception {
+ public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
final long timestamp = mockTime.milliseconds();
produceMessages(timestamp);
produceMessages(timestamp);
@@ -215,7 +211,8 @@ public class KStreamAggregationDedupIntegrationTest {
new KeyValueTimestamp<>("3@" + window, 2L, timestamp),
new KeyValueTimestamp<>("4@" + window, 2L, timestamp),
new KeyValueTimestamp<>("5@" + window, 2L, timestamp)
- )
+ ),
+ testInfo
);
}
@@ -238,8 +235,8 @@ public class KStreamAggregationDedupIntegrationTest {
}
- private void createTopics() throws InterruptedException {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ private void createTopics(final TestInfo testInfo) throws InterruptedException {
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamOneInput = "stream-one-" + safeTestName;
outputTopic = "output-" + safeTestName;
CLUSTER.createTopic(streamOneInput, 3, 1);
@@ -254,10 +251,11 @@ public class KStreamAggregationDedupIntegrationTest {
private <K, V> void validateReceivedMessages(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
- final List<KeyValueTimestamp<K, V>> expectedRecords)
+ final List<KeyValueTimestamp<K, V>> expectedRecords,
+ final TestInfo testInfo)
throws Exception {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index a740605bd8..33373a7669 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -63,15 +63,14 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -96,25 +95,23 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@SuppressWarnings({"unchecked", "deprecation"})
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
public class KStreamAggregationIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
-
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -133,15 +130,12 @@ public class KStreamAggregationIntegrationTest {
private Aggregator<String, String, Integer> aggregator;
private KStream<Integer, String> stream;
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void before() throws InterruptedException {
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws InterruptedException {
builder = new StreamsBuilder();
- createTopics();
+ createTopics(testInfo);
streamsConfiguration = new Properties();
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -160,7 +154,7 @@ public class KStreamAggregationIntegrationTest {
aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
}
- @After
+ @AfterEach
public void whenShuttingDown() throws IOException {
if (kafkaStreams != null) {
kafkaStreams.close();
@@ -169,7 +163,7 @@ public class KStreamAggregationIntegrationTest {
}
@Test
- public void shouldReduce() throws Exception {
+ public void shouldReduce(final TestInfo testInfo) throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream
.reduce(reducer, Materialized.as("reduce-by-key"))
@@ -183,7 +177,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValueTimestamp<String, String>> results = receiveMessages(
new StringDeserializer(),
new StringDeserializer(),
- 10);
+ 10,
+ testInfo);
results.sort(KStreamAggregationIntegrationTest::compare);
@@ -215,7 +210,7 @@ public class KStreamAggregationIntegrationTest {
@SuppressWarnings("deprecation")
@Test
- public void shouldReduceWindowed() throws Exception {
+ public void shouldReduceWindowed(final TestInfo testInfo) throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
mockTime.sleep(1000);
produceMessages(firstBatchTimestamp);
@@ -237,7 +232,8 @@ public class KStreamAggregationIntegrationTest {
new TimeWindowedDeserializer<>(),
new StringDeserializer(),
String.class,
- 15);
+ 15,
+ testInfo);
// read from ConsoleConsumer
final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
@@ -290,7 +286,7 @@ public class KStreamAggregationIntegrationTest {
}
@Test
- public void shouldAggregate() throws Exception {
+ public void shouldAggregate(final TestInfo testInfo) throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream.aggregate(
initializer,
@@ -306,7 +302,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValueTimestamp<String, Integer>> results = receiveMessages(
new StringDeserializer(),
new IntegerDeserializer(),
- 10);
+ 10,
+ testInfo);
results.sort(KStreamAggregationIntegrationTest::compare);
@@ -326,7 +323,7 @@ public class KStreamAggregationIntegrationTest {
@SuppressWarnings("deprecation")
@Test
- public void shouldAggregateWindowed() throws Exception {
+ public void shouldAggregateWindowed(final TestInfo testInfo) throws Exception {
final long firstTimestamp = mockTime.milliseconds();
mockTime.sleep(1000);
produceMessages(firstTimestamp);
@@ -351,7 +348,8 @@ public class KStreamAggregationIntegrationTest {
new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
new IntegerDeserializer(),
String.class,
- 15);
+ 15,
+ testInfo);
// read from ConsoleConsumer
final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
@@ -403,7 +401,7 @@ public class KStreamAggregationIntegrationTest {
}
- private void shouldCountHelper() throws Exception {
+ private void shouldCountHelper(final TestInfo testInfo) throws Exception {
startStreams();
produceMessages(mockTime.milliseconds());
@@ -411,7 +409,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
- 10);
+ 10,
+ testInfo);
results.sort(KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
@@ -429,30 +428,30 @@ public class KStreamAggregationIntegrationTest {
}
@Test
- public void shouldCount() throws Exception {
+ public void shouldCount(final TestInfo testInfo) throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream.count(Materialized.as("count-by-key"))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
- shouldCountHelper();
+ shouldCountHelper(testInfo);
}
@Test
- public void shouldCountWithInternalStore() throws Exception {
+ public void shouldCountWithInternalStore(final TestInfo testInfo) throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream.count()
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
- shouldCountHelper();
+ shouldCountHelper(testInfo);
}
@SuppressWarnings("deprecation")
@Test
- public void shouldGroupByKey() throws Exception {
+ public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
final long timestamp = mockTime.milliseconds();
produceMessages(timestamp);
produceMessages(timestamp);
@@ -468,7 +467,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
- 10);
+ 10,
+ testInfo);
results.sort(KStreamAggregationIntegrationTest::compare);
final long window = timestamp / 500 * 500;
@@ -488,7 +488,7 @@ public class KStreamAggregationIntegrationTest {
@SuppressWarnings("deprecation")
@Test
- public void shouldReduceSlidingWindows() throws Exception {
+ public void shouldReduceSlidingWindows(final TestInfo testInfo) throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
final long timeDifference = 500L;
produceMessages(firstBatchTimestamp);
@@ -511,7 +511,8 @@ public class KStreamAggregationIntegrationTest {
new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
new StringDeserializer(),
String.class,
- 30);
+ 30,
+ testInfo);
final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
new TimeWindowedDeserializer<String>(),
@@ -594,7 +595,7 @@ public class KStreamAggregationIntegrationTest {
@SuppressWarnings("deprecation")
@Test
- public void shouldAggregateSlidingWindows() throws Exception {
+ public void shouldAggregateSlidingWindows(final TestInfo testInfo) throws Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
final long timeDifference = 500L;
produceMessages(firstBatchTimestamp);
@@ -620,7 +621,8 @@ public class KStreamAggregationIntegrationTest {
new TimeWindowedDeserializer<>(),
new IntegerDeserializer(),
String.class,
- 30);
+ 30,
+ testInfo);
// read from ConsoleConsumer
final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
@@ -1040,8 +1042,8 @@ public class KStreamAggregationIntegrationTest {
}
- private void createTopics() throws InterruptedException {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ private void createTopics(final TestInfo testInfo) throws InterruptedException {
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamOneInput = "stream-one-" + safeTestName;
outputTopic = "output-" + safeTestName;
userSessionsStream = "user-sessions-" + safeTestName;
@@ -1056,19 +1058,21 @@ public class KStreamAggregationIntegrationTest {
private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
- final int numMessages)
+ final int numMessages,
+ final TestInfo testInfo)
throws Exception {
- return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
+ return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages, testInfo);
}
private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class innerClass,
- final int numMessages)
+ final int numMessages,
+ final TestInfo testInfo)
throws Exception {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
@@ -1090,8 +1094,9 @@ public class KStreamAggregationIntegrationTest {
private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class innerClass,
- final int numMessages) throws Exception {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final int numMessages,
+ final TestInfo testInfo) throws Exception {
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index 006dd9275f..b7088fc4e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -53,21 +53,19 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
import kafka.utils.MockTime;
-import org.junit.rules.Timeout;
-
-@Category({IntegrationTest.class})
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+@Timeout(600)
+@Category(IntegrationTest.class)
public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private final static int NUM_BROKERS = 1;
public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -86,7 +84,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
private final static Properties PRODUCER_CONFIG_1 = new Properties();
private final static Properties PRODUCER_CONFIG_2 = new Properties();
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
//Use multiple partitions to ensure distribution of keys.
@@ -125,12 +123,12 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
- @Before
+ @BeforeEach
public void before() throws IOException {
final String stateDirBasePath = TestUtils.tempDirectory().getPath();
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
@@ -138,7 +136,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
}
- @After
+ @AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 6b98d77589..5892cdc1b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -35,17 +35,17 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
@@ -54,10 +54,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
public class KTableSourceTopicRestartIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 3;
private static final String SOURCE_TOPIC = "source-topic";
private static final Properties PRODUCER_CONFIG = new Properties();
@@ -65,7 +64,7 @@ public class KTableSourceTopicRestartIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -84,7 +83,7 @@ public class KTableSourceTopicRestartIntegrationTest {
PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -99,15 +98,12 @@ public class KTableSourceTopicRestartIntegrationTest {
private Map<String, String> expectedInitialResultsMap;
private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void before() throws Exception {
- sourceTopic = SOURCE_TOPIC + "-" + testName.getMethodName();
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws Exception {
+ sourceTopic = SOURCE_TOPIC + "-" + testInfo.getTestMethod().map(Method::getName);
CLUSTER.createTopic(sourceTopic);
- STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, IntegrationTestUtils.safeUniqueTestName(getClass(), testName));
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo));
final KTable<String, String> kTable = streamsBuilder.table(sourceTopic, Materialized.as("store"));
kTable.toStream().foreach(readKeyValues::put);
@@ -116,7 +112,7 @@ public class KTableSourceTopicRestartIntegrationTest {
expectedResultsWithDataWrittenDuringRestoreMap = createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
}
- @After
+ @AfterEach
public void after() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
index 54736c44a2..cb21db9d88 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
@@ -33,14 +33,13 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -57,13 +56,12 @@ import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
public class RackAwarenessIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -78,9 +76,6 @@ public class RackAwarenessIntegrationTest {
private static final int DEFAULT_NUMBER_OF_STATEFUL_SUB_TOPOLOGIES = 1;
private static final int DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES = 2;
- @Rule
- public TestName testName = new TestName();
-
private static final String INPUT_TOPIC = "input-topic";
private static final String TAG_ZONE = "zone";
@@ -90,17 +85,17 @@ public class RackAwarenessIntegrationTest {
private Properties baseConfiguration;
private Topology topology;
- @BeforeClass
+ @BeforeAll
public static void createTopics() throws Exception {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 6, 1);
}
- @Before
- public void setup() {
+ @BeforeEach
+ public void setup(final TestInfo testInfo) {
kafkaStreamsInstances = new ArrayList<>();
baseConfiguration = new Properties();
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final String applicationId = "app-" + safeTestName;
baseConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
baseConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -109,7 +104,7 @@ public class RackAwarenessIntegrationTest {
baseConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
}
- @After
+ @AfterEach
public void cleanup() throws IOException {
for (final KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : kafkaStreamsInstances) {
kafkaStreamsWithConfiguration.kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
@@ -139,10 +134,10 @@ public class RackAwarenessIntegrationTest {
assertEquals(StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE, clientTagKeys.size());
Stream.of(clientTags1, clientTags2)
- .forEach(clientTags -> assertEquals(String.format("clientsTags with content '%s' " +
- "did not match expected size", clientTags),
- StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
- clientTags.size()));
+ .forEach(clientTags -> assertEquals(StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+ clientTags.size(),
+ String.format("clientsTags with content '%s' " +
+ "did not match expected size", clientTags)));
createAndStart(clientTags1, clientTagKeys, numberOfStandbyReplicas);
createAndStart(clientTags1, clientTagKeys, numberOfStandbyReplicas);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index dc0be1a1c7..243264e3fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -56,13 +56,14 @@ import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.File;
import java.io.IOException;
@@ -74,8 +75,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -87,37 +86,34 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.wa
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForStandbyCompletion;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
public class RestoreIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
- @Rule
- public final TestName testName = new TestName();
private String appId;
private String inputStream;
private final int numberOfKeys = 10000;
private KafkaStreams kafkaStreams;
- @Before
- public void createTopics() throws InterruptedException {
- appId = safeUniqueTestName(RestoreIntegrationTest.class, testName);
+ @BeforeEach
+ public void createTopics(final TestInfo testInfo) throws InterruptedException {
+ appId = safeUniqueTestName(RestoreIntegrationTest.class, testInfo);
inputStream = appId + "-input-stream";
CLUSTER.createTopic(inputStream, 2, 1);
}
@@ -135,7 +131,7 @@ public class RestoreIntegrationTest {
return streamsConfiguration;
}
- @After
+ @AfterEach
public void shutdown() {
if (kafkaStreams != null) {
kafkaStreams.close(Duration.ofSeconds(30));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 31baae96ba..689b1c0beb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -58,6 +58,7 @@ import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.TestInfo;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +67,7 @@ import scala.Option;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
@@ -225,9 +227,23 @@ public class IntegrationTestUtils {
/**
* Gives a test name that is safe to be used in application ids, topic names, etc.
* The name is safe even for parameterized methods.
+ * Used by tests not yet migrated from JUnit 4.
*/
public static String safeUniqueTestName(final Class<?> testClass, final TestName testName) {
- return (testClass.getSimpleName() + testName.getMethodName())
+ return safeUniqueTestName(testClass, testName.getMethodName());
+ }
+
+ /**
+ * Same as @see IntegrationTestUtils#safeUniqueTestName except it accepts a TestInfo passed in by
+ * JUnit 5 instead of a TestName from JUnit 4.
+ * Used by tests migrated to JUnit 5.
+ */
+ public static String safeUniqueTestName(final Class<?> testClass, final TestInfo testInfo) {
+ return safeUniqueTestName(testClass, testInfo.getTestMethod().map(Method::getName).orElse(""));
+ }
+
+ private static String safeUniqueTestName(final Class<?> testClass, final String methodName) {
+ return (testClass.getSimpleName() + methodName)
.replace(':', '_')
.replace('.', '_')
.replace('[', '_')