You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/07/21 15:28:01 UTC

[kafka] branch trunk updated: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1 (#12285)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 569a358a3f KAFKA-14001: Migrate streams module to JUnit 5 - Part 1 (#12285)
569a358a3f is described below

commit 569a358a3f9c4d2aa7aaeb277f691ad0bae5db34
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Thu Jul 21 16:27:53 2022 +0100

    KAFKA-14001: Migrate streams module to JUnit 5 - Part 1 (#12285)
    
    This pull request addresses https://issues.apache.org/jira/browse/KAFKA-14001. It is the first of a series of pull requests which address the move of Kafka Streams tests from JUnit 4 to JUnit 5.
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Bruno Cadonna <ca...@apache.org>
---
 .../integration/AdjustStreamThreadCountTest.java   |  47 ++++-----
 .../integration/EmitOnChangeIntegrationTest.java   |  30 +++---
 .../FineGrainedAutoResetIntegrationTest.java       |  24 +++--
 .../integration/GlobalKTableIntegrationTest.java   |  44 ++++-----
 .../integration/GlobalThreadShutDownOrderTest.java |  37 +++----
 ...ighAvailabilityTaskAssignorIntegrationTest.java |  36 +++----
 .../streams/integration/IQv2IntegrationTest.java   |  43 ++++----
 .../integration/InternalTopicIntegrationTest.java  |  31 +++---
 .../KStreamAggregationDedupIntegrationTest.java    |  62 ++++++------
 .../KStreamAggregationIntegrationTest.java         | 109 +++++++++++----------
 ...yInnerJoinCustomPartitionerIntegrationTest.java |  30 +++---
 .../KTableSourceTopicRestartIntegrationTest.java   |  38 ++++---
 .../integration/RackAwarenessIntegrationTest.java  |  43 ++++----
 .../integration/RestoreIntegrationTest.java        |  36 +++----
 .../integration/utils/IntegrationTestUtils.java    |  18 +++-
 15 files changed, 296 insertions(+), 332 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index c8c30f6837..bf64068b94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -34,17 +34,16 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -69,33 +68,27 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
+@Timeout(600)
 @Category(IntegrationTest.class)
 public class AdjustStreamThreadCountTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
-
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-
-    @Rule
-    public TestName testName = new TestName();
-
     private final List<KafkaStreams.State> stateTransitionHistory = new ArrayList<>();
     private static String inputTopic;
     private static StreamsBuilder builder;
@@ -103,9 +96,9 @@ public class AdjustStreamThreadCountTest {
     private static String appId = "";
     public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
 
-    @Before
-    public void setup() {
-        final String testId = safeUniqueTestName(getClass(), testName);
+    @BeforeEach
+    public void setup(final TestInfo testInfo) {
+        final String testId = safeUniqueTestName(getClass(), testInfo);
         appId = "appId_" + testId;
         inputTopic = "input" + testId;
         IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
@@ -131,7 +124,7 @@ public class AdjustStreamThreadCountTest {
         waitForRunning();
     }
 
-    @After
+    @AfterEach
     public void teardown() throws IOException {
         purgeLocalStreamsState(properties);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index f41c95a6bb..c7a545068f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -33,14 +33,13 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -52,35 +51,30 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 
+@Timeout(600)
 @Category(IntegrationTest.class)
 public class EmitOnChangeIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
-
     private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-    @Rule
-    public TestName testName = new TestName();
-
     private static String inputTopic;
     private static String inputTopic2;
     private static String outputTopic;
     private static String outputTopic2;
     private static String appId = "";
 
-    @Before
-    public void setup() {
-        final String testId = safeUniqueTestName(getClass(), testName);
+    @BeforeEach
+    public void setup(final TestInfo testInfo) {
+        final String testId = safeUniqueTestName(getClass(), testInfo);
         appId = "appId_" + testId;
         inputTopic = "input" + testId;
         inputTopic2 = "input2" + testId;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 05f2359efc..0d439a32ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -41,13 +41,12 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -62,12 +61,11 @@ import java.util.regex.Pattern;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.fail;
 
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class FineGrainedAutoResetIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
@@ -76,7 +74,7 @@ public class FineGrainedAutoResetIntegrationTest {
 
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException, InterruptedException {
         CLUSTER.start();
         CLUSTER.createTopics(
@@ -105,7 +103,7 @@ public class FineGrainedAutoResetIntegrationTest {
                 OUTPUT_TOPIC_2);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -143,7 +141,7 @@ public class FineGrainedAutoResetIntegrationTest {
     private final String topicYTestMessage = "topic-Y test";
     private final String topicZTestMessage = "topic-Z test";
 
-    @Before
+    @BeforeEach
     public void setUp() throws IOException {
 
         final Properties props = new Properties();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index d2c92a761d..c55dbfb275 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -44,15 +44,14 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -66,23 +65,21 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class GlobalKTableIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
-
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -101,15 +98,12 @@ public class GlobalKTableIntegrationTest {
     private KStream<String, Long> stream;
     private MockApiProcessorSupplier<String, String, Void, Void> supplier;
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @Before
-    public void before() throws Exception {
+    @BeforeEach
+    public void before(final TestInfo testInfo) throws Exception {
         builder = new StreamsBuilder();
-        createTopics();
+        createTopics(testInfo);
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -125,7 +119,7 @@ public class GlobalKTableIntegrationTest {
         supplier = new MockApiProcessorSupplier<>();
     }
 
-    @After
+    @AfterEach
     public void whenShuttingDown() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close();
@@ -348,8 +342,8 @@ public class GlobalKTableIntegrationTest {
         kafkaStreams.close();
     }
 
-    private void createTopics() throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    private void createTopics(final TestInfo testInfo) throws Exception {
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamTopic = "stream-" + safeTestName;
         globalTableTopic = "globalTable-" + safeTestName;
         CLUSTER.createTopics(streamTopic);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 6f4bd53427..fcf5bb1e6c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -39,15 +39,14 @@ import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -58,7 +57,7 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 
 /**
@@ -70,10 +69,9 @@ import static org.junit.Assert.assertEquals;
  * Otherwise if the GlobalStreamThread were to close underneath the StreamThread
  * an exception would be thrown as the GlobalStreamThread closes all global stores on closing.
  */
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class GlobalThreadShutDownOrderTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static final Properties BROKER_CONFIG;
 
@@ -87,12 +85,12 @@ public class GlobalThreadShutDownOrderTest {
 
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -108,15 +106,12 @@ public class GlobalThreadShutDownOrderTest {
     private final List<Long> retrievedValuesList = new ArrayList<>();
     private boolean firstRecordProcessed;
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @Before
-    public void before() throws Exception {
+    @BeforeEach
+    public void before(final TestInfo testInfo) throws Exception {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -145,7 +140,7 @@ public class GlobalThreadShutDownOrderTest {
 
     }
 
-    @After
+    @AfterEach
     public void after() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index d712dfa3fc..44ca7d461d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -43,13 +43,12 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.NoRetryException;
 import org.apache.kafka.test.TestUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -73,43 +72,38 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
+@Timeout(600)
 @Category(IntegrationTest.class)
 public class HighAvailabilityTaskAssignorIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
-
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-
-    @Rule
-    public TestName testName = new TestName();
-
     @Test
-    public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException {
+    public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final TestInfo testInfo) throws InterruptedException {
         // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
         // value is one minute
-        shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)));
+        shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo);
     }
 
     @Test
-    public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws InterruptedException {
+    public void shouldScaleOutWithWarmupTasksAndPersistentStores(final TestInfo testInfo) throws InterruptedException {
         // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
         // value is one minute
-        shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)));
+        shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo);
     }
 
-    private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction) throws InterruptedException {
-        final String testId = safeUniqueTestName(getClass(), testName);
+    private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction,
+                                               final TestInfo testInfo) throws InterruptedException {
+        final String testId = safeUniqueTestName(getClass(), testInfo);
         final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
         final String inputTopic = "input" + testId;
         final Set<TopicPartition> inputTopicPartitions = mkSet(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 5d31398e38..e223b7e43c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -55,15 +55,14 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.StoreQueryUtils;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -88,10 +87,9 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.matchesPattern;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class IQv2IntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
     private static int port = 0;
@@ -103,10 +101,7 @@ public class IQv2IntegrationTest {
 
     private KafkaStreams kafkaStreams;
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @BeforeClass
+    @BeforeAll
     public static void before()
         throws InterruptedException, IOException, ExecutionException, TimeoutException {
         CLUSTER.start();
@@ -155,8 +150,8 @@ public class IQv2IntegrationTest {
         ));
     }
 
-    @Before
-    public void beforeTest() {
+    @BeforeEach
+    public void beforeTest(final TestInfo testInfo) {
         final StreamsBuilder builder = new StreamsBuilder();
 
         builder.table(
@@ -165,17 +160,17 @@ public class IQv2IntegrationTest {
             Materialized.as(STORE_NAME)
         );
 
-        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration());
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(testInfo));
         kafkaStreams.cleanUp();
     }
 
-    @After
+    @AfterEach
     public void afterTest() {
         kafkaStreams.close();
         kafkaStreams.cleanUp();
     }
 
-    @AfterClass
+    @AfterAll
     public static void after() {
         CLUSTER.stop();
     }
@@ -295,7 +290,7 @@ public class IQv2IntegrationTest {
     }
 
     @Test
-    public void shouldNotRequireQueryHandler() {
+    public void shouldNotRequireQueryHandler(final TestInfo testInfo) {
         final KeyQuery<Integer, ValueAndTimestamp<Integer>> query = KeyQuery.withKey(1);
         final int partition = 1;
         final Set<Integer> partitions = singleton(partition);
@@ -424,7 +419,7 @@ public class IQv2IntegrationTest {
             })
         );
 
-        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration());
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(testInfo));
         kafkaStreams.cleanUp();
 
         kafkaStreams.start();
@@ -442,8 +437,8 @@ public class IQv2IntegrationTest {
     }
 
 
-    private Properties streamsConfiguration() {
-        final String safeTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+    private Properties streamsConfiguration(final TestInfo testInfo) {
+        final String safeTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo);
 
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 98d6c7c4bd..f5db920a46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -44,14 +44,13 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -67,27 +66,25 @@ import static java.time.Duration.ofSeconds;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Tests related to internal topics in streams
  */
 @SuppressWarnings("deprecation")
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class InternalTopicIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
-
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException, InterruptedException {
         CLUSTER.start();
         CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -101,7 +98,7 @@ public class InternalTopicIntegrationTest {
 
     private Properties streamsProp;
 
-    @Before
+    @BeforeEach
     public void before() {
         streamsProp = new Properties();
         streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -113,7 +110,7 @@ public class InternalTopicIntegrationTest {
         streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     }
 
-    @After
+    @AfterEach
     public void after() throws IOException {
         // Remove any state from previous test runs
         IntegrationTestUtils.purgeLocalStreamsState(streamsProp);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index f7a32934b4..1a899d6a5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -43,15 +43,14 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -65,23 +64,21 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
  * Similar to KStreamAggregationIntegrationTest but with dedupping enabled
  * by virtue of having a large commit interval
  */
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 @SuppressWarnings("deprecation")
 public class KStreamAggregationDedupIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
-
     private static final int NUM_BROKERS = 1;
     private static final long COMMIT_INTERVAL_MS = 300L;
 
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -97,15 +94,12 @@ public class KStreamAggregationDedupIntegrationTest {
     private Reducer<String> reducer;
     private KStream<Integer, String> stream;
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @Before
-    public void before() throws InterruptedException {
+    @BeforeEach
+    public void before(final TestInfo testInfo) throws InterruptedException {
         builder = new StreamsBuilder();
-        createTopics();
+        createTopics(testInfo);
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -120,7 +114,7 @@ public class KStreamAggregationDedupIntegrationTest {
         reducer = (value1, value2) -> value1 + ":" + value2;
     }
 
-    @After
+    @AfterEach
     public void whenShuttingDown() throws IOException {
         if (kafkaStreams != null) {
             kafkaStreams.close();
@@ -130,7 +124,7 @@ public class KStreamAggregationDedupIntegrationTest {
 
 
     @Test
-    public void shouldReduce() throws Exception {
+    public void shouldReduce(final TestInfo testInfo) throws Exception {
         produceMessages(System.currentTimeMillis());
         groupedStream
                 .reduce(reducer, Materialized.as("reduce-by-key"))
@@ -150,11 +144,12 @@ public class KStreamAggregationDedupIntegrationTest {
                     new KeyValueTimestamp<>("B", "B:B", timestamp),
                     new KeyValueTimestamp<>("C", "C:C", timestamp),
                     new KeyValueTimestamp<>("D", "D:D", timestamp),
-                    new KeyValueTimestamp<>("E", "E:E", timestamp)));
+                    new KeyValueTimestamp<>("E", "E:E", timestamp)),
+                testInfo);
     }
 
     @Test
-    public void shouldReduceWindowed() throws Exception {
+    public void shouldReduceWindowed(final TestInfo testInfo) throws Exception {
         final long firstBatchTimestamp = System.currentTimeMillis() - 1000;
         produceMessages(firstBatchTimestamp);
         final long secondBatchTimestamp = System.currentTimeMillis();
@@ -186,12 +181,13 @@ public class KStreamAggregationDedupIntegrationTest {
                     new KeyValueTimestamp<>("D@" + secondBatchWindow, "D:D", secondBatchTimestamp),
                     new KeyValueTimestamp<>("E@" + firstBatchWindow, "E", firstBatchTimestamp),
                     new KeyValueTimestamp<>("E@" + secondBatchWindow, "E:E", secondBatchTimestamp)
-                )
+                ),
+                testInfo
         );
     }
 
     @Test
-    public void shouldGroupByKey() throws Exception {
+    public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
         final long timestamp = mockTime.milliseconds();
         produceMessages(timestamp);
         produceMessages(timestamp);
@@ -215,7 +211,8 @@ public class KStreamAggregationDedupIntegrationTest {
                     new KeyValueTimestamp<>("3@" + window, 2L, timestamp),
                     new KeyValueTimestamp<>("4@" + window, 2L, timestamp),
                     new KeyValueTimestamp<>("5@" + window, 2L, timestamp)
-                )
+                ),
+                testInfo
         );
     }
 
@@ -238,8 +235,8 @@ public class KStreamAggregationDedupIntegrationTest {
     }
 
 
-    private void createTopics() throws InterruptedException {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    private void createTopics(final TestInfo testInfo) throws InterruptedException {
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamOneInput = "stream-one-" + safeTestName;
         outputTopic = "output-" + safeTestName;
         CLUSTER.createTopic(streamOneInput, 3, 1);
@@ -254,10 +251,11 @@ public class KStreamAggregationDedupIntegrationTest {
 
     private <K, V> void validateReceivedMessages(final Deserializer<K> keyDeserializer,
                                                  final Deserializer<V> valueDeserializer,
-                                                 final List<KeyValueTimestamp<K, V>> expectedRecords)
+                                                 final List<KeyValueTimestamp<K, V>> expectedRecords,
+                                                 final TestInfo testInfo)
             throws Exception {
 
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index a740605bd8..33373a7669 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -63,15 +63,14 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -96,25 +95,23 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @SuppressWarnings({"unchecked", "deprecation"})
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class KStreamAggregationIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
-
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -133,15 +130,12 @@ public class KStreamAggregationIntegrationTest {
     private Aggregator<String, String, Integer> aggregator;
     private KStream<Integer, String> stream;
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @Before
-    public void before() throws InterruptedException {
+    @BeforeEach
+    public void before(final TestInfo testInfo) throws InterruptedException {
         builder = new StreamsBuilder();
-        createTopics();
+        createTopics(testInfo);
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -160,7 +154,7 @@ public class KStreamAggregationIntegrationTest {
         aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
     }
 
-    @After
+    @AfterEach
     public void whenShuttingDown() throws IOException {
         if (kafkaStreams != null) {
             kafkaStreams.close();
@@ -169,7 +163,7 @@ public class KStreamAggregationIntegrationTest {
     }
 
     @Test
-    public void shouldReduce() throws Exception {
+    public void shouldReduce(final TestInfo testInfo) throws Exception {
         produceMessages(mockTime.milliseconds());
         groupedStream
             .reduce(reducer, Materialized.as("reduce-by-key"))
@@ -183,7 +177,8 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValueTimestamp<String, String>> results = receiveMessages(
             new StringDeserializer(),
             new StringDeserializer(),
-            10);
+            10,
+            testInfo);
 
         results.sort(KStreamAggregationIntegrationTest::compare);
 
@@ -215,7 +210,7 @@ public class KStreamAggregationIntegrationTest {
 
     @SuppressWarnings("deprecation")
     @Test
-    public void shouldReduceWindowed() throws Exception {
+    public void shouldReduceWindowed(final TestInfo testInfo) throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
         mockTime.sleep(1000);
         produceMessages(firstBatchTimestamp);
@@ -237,7 +232,8 @@ public class KStreamAggregationIntegrationTest {
             new TimeWindowedDeserializer<>(),
             new StringDeserializer(),
             String.class,
-            15);
+            15,
+            testInfo);
 
         // read from ConsoleConsumer
         final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
@@ -290,7 +286,7 @@ public class KStreamAggregationIntegrationTest {
     }
 
     @Test
-    public void shouldAggregate() throws Exception {
+    public void shouldAggregate(final TestInfo testInfo) throws Exception {
         produceMessages(mockTime.milliseconds());
         groupedStream.aggregate(
             initializer,
@@ -306,7 +302,8 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValueTimestamp<String, Integer>> results = receiveMessages(
             new StringDeserializer(),
             new IntegerDeserializer(),
-            10);
+            10,
+            testInfo);
 
         results.sort(KStreamAggregationIntegrationTest::compare);
 
@@ -326,7 +323,7 @@ public class KStreamAggregationIntegrationTest {
 
     @SuppressWarnings("deprecation")
     @Test
-    public void shouldAggregateWindowed() throws Exception {
+    public void shouldAggregateWindowed(final TestInfo testInfo) throws Exception {
         final long firstTimestamp = mockTime.milliseconds();
         mockTime.sleep(1000);
         produceMessages(firstTimestamp);
@@ -351,7 +348,8 @@ public class KStreamAggregationIntegrationTest {
             new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
             new IntegerDeserializer(),
             String.class,
-            15);
+            15,
+            testInfo);
 
         // read from ConsoleConsumer
         final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
@@ -403,7 +401,7 @@ public class KStreamAggregationIntegrationTest {
 
     }
 
-    private void shouldCountHelper() throws Exception {
+    private void shouldCountHelper(final TestInfo testInfo) throws Exception {
         startStreams();
 
         produceMessages(mockTime.milliseconds());
@@ -411,7 +409,8 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
             new StringDeserializer(),
             new LongDeserializer(),
-            10);
+            10,
+            testInfo);
         results.sort(KStreamAggregationIntegrationTest::compare);
 
         assertThat(results, is(Arrays.asList(
@@ -429,30 +428,30 @@ public class KStreamAggregationIntegrationTest {
     }
 
     @Test
-    public void shouldCount() throws Exception {
+    public void shouldCount(final TestInfo testInfo) throws Exception {
         produceMessages(mockTime.milliseconds());
 
         groupedStream.count(Materialized.as("count-by-key"))
                 .toStream()
                 .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
-        shouldCountHelper();
+        shouldCountHelper(testInfo);
     }
 
     @Test
-    public void shouldCountWithInternalStore() throws Exception {
+    public void shouldCountWithInternalStore(final TestInfo testInfo) throws Exception {
         produceMessages(mockTime.milliseconds());
 
         groupedStream.count()
                 .toStream()
                 .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
-        shouldCountHelper();
+        shouldCountHelper(testInfo);
     }
 
     @SuppressWarnings("deprecation")
     @Test
-    public void shouldGroupByKey() throws Exception {
+    public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
         final long timestamp = mockTime.milliseconds();
         produceMessages(timestamp);
         produceMessages(timestamp);
@@ -468,7 +467,8 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
             new StringDeserializer(),
             new LongDeserializer(),
-            10);
+            10,
+            testInfo);
         results.sort(KStreamAggregationIntegrationTest::compare);
 
         final long window = timestamp / 500 * 500;
@@ -488,7 +488,7 @@ public class KStreamAggregationIntegrationTest {
 
     @SuppressWarnings("deprecation")
     @Test
-    public void shouldReduceSlidingWindows() throws Exception {
+    public void shouldReduceSlidingWindows(final TestInfo testInfo) throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
         final long timeDifference = 500L;
         produceMessages(firstBatchTimestamp);
@@ -511,7 +511,8 @@ public class KStreamAggregationIntegrationTest {
                 new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
                 new StringDeserializer(),
                 String.class,
-                30);
+                30,
+                testInfo);
 
         final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
                 new TimeWindowedDeserializer<String>(),
@@ -594,7 +595,7 @@ public class KStreamAggregationIntegrationTest {
 
     @SuppressWarnings("deprecation")
     @Test
-    public void shouldAggregateSlidingWindows() throws Exception {
+    public void shouldAggregateSlidingWindows(final TestInfo testInfo) throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
         final long timeDifference = 500L;
         produceMessages(firstBatchTimestamp);
@@ -620,7 +621,8 @@ public class KStreamAggregationIntegrationTest {
                 new TimeWindowedDeserializer<>(),
                 new IntegerDeserializer(),
                 String.class,
-                30);
+                30,
+                testInfo);
 
         // read from ConsoleConsumer
         final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
@@ -1040,8 +1042,8 @@ public class KStreamAggregationIntegrationTest {
     }
 
 
-    private void createTopics() throws InterruptedException {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    private void createTopics(final TestInfo testInfo) throws InterruptedException {
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamOneInput = "stream-one-" + safeTestName;
         outputTopic = "output-" + safeTestName;
         userSessionsStream = "user-sessions-" + safeTestName;
@@ -1056,19 +1058,21 @@ public class KStreamAggregationIntegrationTest {
 
     private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
                                                                  final Deserializer<V> valueDeserializer,
-                                                                 final int numMessages)
+                                                                 final int numMessages,
+                                                                 final TestInfo testInfo)
             throws Exception {
 
-        return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
+        return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages, testInfo);
     }
 
     private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
                                                                  final Deserializer<V> valueDeserializer,
                                                                  final Class innerClass,
-                                                                 final int numMessages)
+                                                                 final int numMessages,
+                                                                 final TestInfo testInfo)
             throws Exception {
 
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
@@ -1090,8 +1094,9 @@ public class KStreamAggregationIntegrationTest {
     private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
                                                                               final Deserializer<V> valueDeserializer,
                                                                               final Class innerClass,
-                                                                              final int numMessages) throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+                                                                              final int numMessages,
+                                                                              final TestInfo testInfo) throws Exception {
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index 006dd9275f..b7088fc4e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -53,21 +53,19 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
 import kafka.utils.MockTime;
-import org.junit.rules.Timeout;
-
-@Category({IntegrationTest.class})
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private final static int NUM_BROKERS = 1;
 
     public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -86,7 +84,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
     private final static Properties PRODUCER_CONFIG_1 = new Properties();
     private final static Properties PRODUCER_CONFIG_2 = new Properties();
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException, InterruptedException {
         CLUSTER.start();
         //Use multiple partitions to ensure distribution of keys.
@@ -125,12 +123,12 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
         CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-    @Before
+    @BeforeEach
     public void before() throws IOException {
         final String stateDirBasePath = TestUtils.tempDirectory().getPath();
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
@@ -138,7 +136,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
         streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
     }
 
-    @After
+    @AfterEach
     public void after() throws IOException {
         if (streams != null) {
             streams.close();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 6b98d77589..5892cdc1b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -35,17 +35,17 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -54,10 +54,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class KTableSourceTopicRestartIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 3;
     private static final String SOURCE_TOPIC = "source-topic";
     private static final Properties PRODUCER_CONFIG = new Properties();
@@ -65,7 +64,7 @@ public class KTableSourceTopicRestartIntegrationTest {
 
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
         STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -84,7 +83,7 @@ public class KTableSourceTopicRestartIntegrationTest {
         PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -99,15 +98,12 @@ public class KTableSourceTopicRestartIntegrationTest {
     private Map<String, String> expectedInitialResultsMap;
     private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @Before
-    public void before() throws Exception {
-        sourceTopic = SOURCE_TOPIC + "-" + testName.getMethodName();
+    @BeforeEach
+    public void before(final TestInfo testInfo) throws Exception {
+        sourceTopic = SOURCE_TOPIC + "-" + testInfo.getTestMethod().map(Method::getName);
         CLUSTER.createTopic(sourceTopic);
 
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, IntegrationTestUtils.safeUniqueTestName(getClass(), testName));
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo));
 
         final KTable<String, String> kTable = streamsBuilder.table(sourceTopic, Materialized.as("store"));
         kTable.toStream().foreach(readKeyValues::put);
@@ -116,7 +112,7 @@ public class KTableSourceTopicRestartIntegrationTest {
         expectedResultsWithDataWrittenDuringRestoreMap = createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
     }
 
-    @After
+    @AfterEach
     public void after() throws Exception {
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
index 54736c44a2..cb21db9d88 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
@@ -33,14 +33,13 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -57,13 +56,12 @@ import java.util.stream.Stream;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class RackAwarenessIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -78,9 +76,6 @@ public class RackAwarenessIntegrationTest {
     private static final int DEFAULT_NUMBER_OF_STATEFUL_SUB_TOPOLOGIES = 1;
     private static final int DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES = 2;
 
-    @Rule
-    public TestName testName = new TestName();
-
     private static final String INPUT_TOPIC = "input-topic";
 
     private static final String TAG_ZONE = "zone";
@@ -90,17 +85,17 @@ public class RackAwarenessIntegrationTest {
     private Properties baseConfiguration;
     private Topology topology;
 
-    @BeforeClass
+    @BeforeAll
     public static void createTopics() throws Exception {
         CLUSTER.start();
         CLUSTER.createTopic(INPUT_TOPIC, 6, 1);
     }
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    public void setup(final TestInfo testInfo) {
         kafkaStreamsInstances = new ArrayList<>();
         baseConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         final String applicationId = "app-" + safeTestName;
         baseConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         baseConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -109,7 +104,7 @@ public class RackAwarenessIntegrationTest {
         baseConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
     }
 
-    @After
+    @AfterEach
     public void cleanup() throws IOException {
         for (final KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : kafkaStreamsInstances) {
             kafkaStreamsWithConfiguration.kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
@@ -139,10 +134,10 @@ public class RackAwarenessIntegrationTest {
 
         assertEquals(StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE, clientTagKeys.size());
         Stream.of(clientTags1, clientTags2)
-              .forEach(clientTags -> assertEquals(String.format("clientsTags with content '%s' " +
-                                                                "did not match expected size", clientTags),
-                                                  StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
-                                                  clientTags.size()));
+              .forEach(clientTags -> assertEquals(StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+                                                  clientTags.size(),
+                                                  String.format("clientsTags with content '%s' " +
+                                                          "did not match expected size", clientTags)));
 
         createAndStart(clientTags1, clientTagKeys, numberOfStandbyReplicas);
         createAndStart(clientTags1, clientTagKeys, numberOfStandbyReplicas);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index dc0be1a1c7..243264e3fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -56,13 +56,14 @@ import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.hamcrest.CoreMatchers;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.File;
 import java.io.IOException;
@@ -74,8 +75,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -87,37 +86,34 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.wa
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForStandbyCompletion;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@Category({IntegrationTest.class})
+@Timeout(600)
+@Category(IntegrationTest.class)
 public class RestoreIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-    @Rule
-    public final TestName testName = new TestName();
     private String appId;
     private String inputStream;
 
     private final int numberOfKeys = 10000;
     private KafkaStreams kafkaStreams;
 
-    @Before
-    public void createTopics() throws InterruptedException {
-        appId = safeUniqueTestName(RestoreIntegrationTest.class, testName);
+    @BeforeEach
+    public void createTopics(final TestInfo testInfo) throws InterruptedException {
+        appId = safeUniqueTestName(RestoreIntegrationTest.class, testInfo);
         inputStream = appId + "-input-stream";
         CLUSTER.createTopic(inputStream, 2, 1);
     }
@@ -135,7 +131,7 @@ public class RestoreIntegrationTest {
         return streamsConfiguration;
     }
 
-    @After
+    @AfterEach
     public void shutdown() {
         if (kafkaStreams != null) {
             kafkaStreams.close(Duration.ofSeconds(30));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 31baae96ba..689b1c0beb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -58,6 +58,7 @@ import org.apache.kafka.streams.query.StateQueryResult;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +67,7 @@ import scala.Option;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -225,9 +227,23 @@ public class IntegrationTestUtils {
     /**
      * Gives a test name that is safe to be used in application ids, topic names, etc.
      * The name is safe even for parameterized methods.
+     * Used by tests not yet migrated from JUnit 4.
      */
     public static String safeUniqueTestName(final Class<?> testClass, final TestName testName) {
-        return (testClass.getSimpleName() + testName.getMethodName())
+        return safeUniqueTestName(testClass, testName.getMethodName());
+    }
+
+    /**
+     * Same as @see IntegrationTestUtils#safeUniqueTestName except it accepts a TestInfo passed in by
+     * JUnit 5 instead of a TestName from JUnit 4.
+     * Used by tests migrated to JUnit 5.
+     */
+    public static String safeUniqueTestName(final Class<?> testClass, final TestInfo testInfo) {
+        return safeUniqueTestName(testClass, testInfo.getTestMethod().map(Method::getName).orElse(""));
+    }
+
+    private static String safeUniqueTestName(final Class<?> testClass, final String methodName) {
+        return (testClass.getSimpleName() + methodName)
                 .replace(':', '_')
                 .replace('.', '_')
                 .replace('[', '_')