You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/04/29 22:12:17 UTC

[kafka] branch trunk updated: KAFKA-9875: Make integration tests more resilient (#8578)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc4d439  KAFKA-9875: Make integration tests more resilient (#8578)
dc4d439 is described below

commit dc4d439825b2d117707b01c7c64769e700246fc6
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Apr 29 17:11:49 2020 -0500

    KAFKA-9875: Make integration tests more resilient (#8578)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 28 ++++++++++++----------
 .../EOSUncleanShutdownIntegrationTest.java         |  4 ++--
 .../GlobalKTableEOSIntegrationTest.java            | 16 +++++--------
 .../integration/GlobalKTableIntegrationTest.java   | 14 ++++++-----
 .../integration/GlobalThreadShutDownOrderTest.java |  5 ++--
 .../KStreamAggregationDedupIntegrationTest.java    | 13 ++++++----
 .../KStreamAggregationIntegrationTest.java         | 20 +++++++++-------
 .../KStreamRepartitionIntegrationTest.java         | 18 +++++++-------
 .../KTableKTableForeignKeyJoinIntegrationTest.java |  4 +++-
 ...reignKeyJoinMaterializationIntegrationTest.java |  4 +++-
 .../integration/LagFetchIntegrationTest.java       | 14 ++++++-----
 .../integration/MetricsIntegrationTest.java        |  8 +++----
 .../OptimizedKTableIntegrationTest.java            |  9 +++----
 .../integration/QueryableStateIntegrationTest.java | 26 ++++++++++----------
 .../ResetPartitionTimeIntegrationTest.java         |  9 +++----
 .../integration/RocksDBMetricsIntegrationTest.java |  9 ++++---
 .../StandbyTaskCreationIntegrationTest.java        |  8 ++++---
 .../integration/StandbyTaskEOSIntegrationTest.java | 13 +++++-----
 .../integration/StoreQueryIntegrationTest.java     | 12 +++++-----
 .../integration/StoreUpgradeIntegrationTest.java   | 14 ++++++-----
 .../SuppressionDurabilityIntegrationTest.java      | 18 ++++++++++----
 .../integration/SuppressionIntegrationTest.java    | 16 ++++++-------
 .../integration/utils/IntegrationTestUtils.java    | 25 +++++++++++++++----
 .../KTableKTableForeignKeyJoinScenarioTest.java    |  4 +++-
 .../org/apache/kafka/test/StreamsTestUtils.java    |  4 +++-
 25 files changed, 183 insertions(+), 132 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 05271e8..9c7c01f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -84,6 +84,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
@@ -848,8 +849,9 @@ public class KafkaStreamsTest {
 
     @Test
     public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
-        final String inputTopic = testName.getMethodName() + "-input";
-        final String outputTopic = testName.getMethodName() + "-output";
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String inputTopic = safeTestName + "-input";
+        final String outputTopic = safeTestName + "-output";
         final Topology topology = new Topology();
         topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
                 .addProcessor("process", () -> new AbstractProcessor<String, String>() {
@@ -866,22 +868,24 @@ public class KafkaStreamsTest {
 
     @Test
     public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception {
-        final String inputTopic = testName.getMethodName() + "-input";
-        final String outputTopic = testName.getMethodName() + "-output";
-        final String globalTopicName = testName.getMethodName() + "-global";
-        final String storeName = testName.getMethodName() + "-counts";
-        final String globalStoreName = testName.getMethodName() + "-globalStore";
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String inputTopic = safeTestName + "-input";
+        final String outputTopic = safeTestName + "-output";
+        final String globalTopicName = safeTestName + "-global";
+        final String storeName = safeTestName + "-counts";
+        final String globalStoreName = safeTestName + "-globalStore";
         final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, false);
         startStreamsAndCheckDirExists(topology, false);
     }
 
     @Test
     public void statefulTopologyShouldCreateStateDirectory() throws Exception {
-        final String inputTopic = testName.getMethodName() + "-input";
-        final String outputTopic = testName.getMethodName() + "-output";
-        final String globalTopicName = testName.getMethodName() + "-global";
-        final String storeName = testName.getMethodName() + "-counts";
-        final String globalStoreName = testName.getMethodName() + "-globalStore";
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String inputTopic = safeTestName + "-input";
+        final String outputTopic = safeTestName + "-output";
+        final String globalTopicName = safeTestName + "-global";
+        final String storeName = safeTestName + "-counts";
+        final String globalStoreName = safeTestName + "-globalStore";
         final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true);
         startStreamsAndCheckDirExists(topology, true);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
index 128aef7..a186c57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
@@ -53,8 +53,8 @@ import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
 import static org.junit.Assert.assertFalse;
 
 
@@ -158,7 +158,7 @@ public class EOSUncleanShutdownIntegrationTest {
             // the state directory should still exist with the empty checkpoint file
             assertFalse(stateDir.exists());
 
-            cleanStateAfterTest(CLUSTER, driver);
+            quietlyCleanStateAfterTest(CLUSTER, driver);
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 2850a51..0d5c1b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -58,6 +58,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.junit.Assert.assertNotNull;
 
 @RunWith(Parameterized.class)
@@ -108,10 +109,8 @@ public class GlobalKTableEOSIntegrationTest {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "globalTable-eos-test-" + testName.getMethodName()
-            .replace('[', '_')
-            .replace(']', '_');
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@@ -291,12 +290,9 @@ public class GlobalKTableEOSIntegrationTest {
     }
 
     private void createTopics() throws Exception {
-        final String suffix = testName.getMethodName()
-            .replace('[', '_')
-            .replace(']', '_');
-        streamTopic = "stream-" + suffix;
-        globalTableTopic = "globalTable-" + suffix;
-        CLUSTER.deleteAllTopicsAndWait(300_000L);
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamTopic = "stream-" + safeTestName;
+        globalTableTopic = "globalTable-" + safeTestName;
         CLUSTER.createTopics(streamTopic);
         CLUSTER.createTopic(globalTableTopic, 2, 1);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 3e4c07f..ed44c17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.integration;
 
-import java.time.Duration;
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.LongSerializer;
@@ -39,8 +38,8 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
@@ -52,12 +51,14 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
@@ -91,8 +92,8 @@ public class GlobalKTableIntegrationTest {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "globalTableTopic-table-test-" + testName.getMethodName();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -300,8 +301,9 @@ public class GlobalKTableIntegrationTest {
     }
 
     private void createTopics() throws Exception {
-        streamTopic = "stream-" + testName.getMethodName();
-        globalTableTopic = "globalTable-" + testName.getMethodName();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamTopic = "stream-" + safeTestName;
+        globalTableTopic = "globalTable-" + safeTestName;
         CLUSTER.createTopics(streamTopic);
         CLUSTER.createTopic(globalTableTopic, 2, 1);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 6adc6f3..17db9fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.junit.Assert.assertEquals;
 
 
@@ -101,8 +102,8 @@ public class GlobalThreadShutDownOrderTest {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "global-thread-shutdown-test" + testName.getMethodName();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 912e497..dbe0b04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.Properties;
 
 import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 
 /**
  * Similar to KStreamAggregationIntegrationTest but with dedupping enabled
@@ -88,8 +89,8 @@ public class KStreamAggregationDedupIntegrationTest {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "kgrouped-stream-test-" + testName.getMethodName();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -222,8 +223,9 @@ public class KStreamAggregationDedupIntegrationTest {
 
 
     private void createTopics() throws InterruptedException {
-        streamOneInput = "stream-one-" + testName.getMethodName();
-        outputTopic = "output-" + testName.getMethodName();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamOneInput = "stream-one-" + safeTestName;
+        outputTopic = "output-" + safeTestName;
         CLUSTER.createTopic(streamOneInput, 3, 1);
         CLUSTER.createTopic(outputTopic);
     }
@@ -238,9 +240,10 @@ public class KStreamAggregationDedupIntegrationTest {
                                                  final Deserializer<V> valueDeserializer,
                                                  final List<KeyValueTimestamp<K, V>> expectedRecords)
         throws InterruptedException {
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testName.getMethodName());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
         consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 1d5ab8e..e04e25e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -88,6 +88,7 @@ import java.util.concurrent.TimeUnit;
 
 import static java.time.Duration.ofMillis;
 import static java.time.Instant.ofEpochMilli;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -107,7 +108,7 @@ public class KStreamAggregationIntegrationTest {
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
     private String streamOneInput;
-    private String userSessionsStream = "user-sessions";
+    private String userSessionsStream;
     private String outputTopic;
     private KGroupedStream<String, String> groupedStream;
     private Reducer<String> reducer;
@@ -123,8 +124,8 @@ public class KStreamAggregationIntegrationTest {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "kgrouped-stream-test-" + testName.getMethodName();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -791,9 +792,10 @@ public class KStreamAggregationIntegrationTest {
 
 
     private void createTopics() throws InterruptedException {
-        streamOneInput = "stream-one-" + testName.getMethodName();
-        outputTopic = "output-" + testName.getMethodName();
-        userSessionsStream = userSessionsStream + "-" + testName.getMethodName();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamOneInput = "stream-one-" + safeTestName;
+        outputTopic = "output-" + safeTestName;
+        userSessionsStream = "user-sessions-" + safeTestName;
         CLUSTER.createTopic(streamOneInput, 3, 1);
         CLUSTER.createTopics(userSessionsStream, outputTopic);
     }
@@ -814,9 +816,10 @@ public class KStreamAggregationIntegrationTest {
                                                                  final Deserializer<V> valueDeserializer,
                                                                  final Class innerClass,
                                                                  final int numMessages) throws InterruptedException {
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testName.getMethodName());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
         consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
@@ -835,9 +838,10 @@ public class KStreamAggregationIntegrationTest {
                                                                                               final Deserializer<V> valueDeserializer,
                                                                                               final Class innerClass,
                                                                                               final int numMessages) throws InterruptedException {
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testName.getMethodName());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
         consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 2cd75d1..6bb1269 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -77,6 +77,7 @@ import java.util.regex.Pattern;
 import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
 import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING;
 import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -116,16 +117,12 @@ public class KStreamRepartitionIntegrationTest {
         streamsConfiguration = new Properties();
         kafkaStreamsInstances = new ArrayList<>();
 
-        final String suffix = testName.getMethodName()
-            .replace('[', '_')
-            .replace(']', '_')
-            .replace(' ', '_')
-            .replace('=', '_');
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
 
-        topicB = "topic-b-" + suffix;
-        inputTopic = "input-topic-" + suffix;
-        outputTopic = "output-topic-" + suffix;
-        applicationId = "kstream-repartition-stream-test-" + suffix;
+        topicB = "topic-b-" + safeTestName;
+        inputTopic = "input-topic-" + safeTestName;
+        outputTopic = "output-topic-" + safeTestName;
+        applicationId = "app-" + safeTestName;
 
         CLUSTER.createTopic(inputTopic, 4, 1);
         CLUSTER.createTopic(outputTopic, 1, 1);
@@ -812,9 +809,10 @@ public class KStreamRepartitionIntegrationTest {
                                                  final Deserializer<V> valueSerializer,
                                                  final List<KeyValue<K, V>> expectedRecords) throws InterruptedException {
 
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-repartition-test-" + testName.getMethodName());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerProperties.setProperty(
             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index bca7c79..34cf1c1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -54,6 +54,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -87,8 +88,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
     @Before
     public void before() {
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamsConfig = mkProperties(mkMap(
-            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey-" + testName.getMethodName()),
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
             mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index 65b872b..11aa616 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -50,6 +50,7 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -75,8 +76,9 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
 
     @Before
     public void before() {
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamsConfig = mkProperties(mkMap(
-            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey-" + testName.getMethodName()),
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
         ));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index f5143e1..2e6d6c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -66,6 +66,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
@@ -88,16 +89,17 @@ public class LagFetchIntegrationTest {
     private String stateStoreName;
 
     @Rule
-    public TestName name = new TestName();
+    public TestName testName = new TestName();
 
     @Before
     public void before() {
-        inputTopicName = "input-topic-" + name.getMethodName();
-        outputTopicName = "output-topic-" + name.getMethodName();
-        stateStoreName = "lagfetch-test-store" + name.getMethodName();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        inputTopicName = "input-topic-" + safeTestName;
+        outputTopicName = "output-topic-" + safeTestName;
+        stateStoreName = "lagfetch-test-store" + safeTestName;
 
         streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lag-fetch-" + name.getMethodName());
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
@@ -106,7 +108,7 @@ public class LagFetchIntegrationTest {
 
         consumerConfiguration = new Properties();
         consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, name.getMethodName() + "-consumer");
+        consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
         consumerConfiguration.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerConfiguration.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerConfiguration.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index d15ab1b..21000c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -59,6 +59,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -72,8 +73,6 @@ public class MetricsIntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private final long timeout = 60000;
 
-    private final static String APPLICATION_ID_VALUE = "stream-metrics-test";
-
     // Metric group
     private static final String STREAM_CLIENT_NODE_METRICS = "stream-metrics";
     private static final String STREAM_THREAD_NODE_METRICS_0100_TO_24 = "stream-metrics";
@@ -227,14 +226,15 @@ public class MetricsIntegrationTest {
     private String appId;
 
     @Rule
-    public TestName name = new TestName();
+    public TestName testName = new TestName();
 
     @Before
     public void before() throws InterruptedException {
         builder = new StreamsBuilder();
         CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
 
-        appId = APPLICATION_ID_VALUE + "-" + name.getMethodName();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        appId = "app-" + safeTestName;
 
         streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index d342631..092cffc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -71,7 +72,7 @@ public class OptimizedKTableIntegrationTest {
     public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     @Rule
-    public final TestName name = new TestName();
+    public final TestName testName = new TestName();
 
     private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
     private final MockTime mockTime = cluster.time;
@@ -215,13 +216,13 @@ public class OptimizedKTableIntegrationTest {
     }
 
     private Properties streamsConfiguration() {
-        final String applicationId = "streamsApp";
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + name.getMethodName());
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
-        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index b3929f6..eb33107 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -98,6 +98,7 @@ import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofSeconds;
 import static java.time.Instant.ofEpochMilli;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
@@ -140,14 +141,15 @@ public class QueryableStateIntegrationTest {
     private Comparator<KeyValue<String, Long>> stringLongComparator;
 
     private void createTopics() throws Exception {
-        streamOne = streamOne + "-" + name.getMethodName();
-        streamConcurrent = streamConcurrent + "-" + name.getMethodName();
-        streamThree = streamThree + "-" + name.getMethodName();
-        outputTopic = outputTopic + "-" + name.getMethodName();
-        outputTopicConcurrent = outputTopicConcurrent + "-" + name.getMethodName();
-        outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + name.getMethodName();
-        outputTopicThree = outputTopicThree + "-" + name.getMethodName();
-        streamTwo = streamTwo + "-" + name.getMethodName();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamOne = streamOne + "-" + safeTestName;
+        streamConcurrent = streamConcurrent + "-" + safeTestName;
+        streamThree = streamThree + "-" + safeTestName;
+        outputTopic = outputTopic + "-" + safeTestName;
+        outputTopicConcurrent = outputTopicConcurrent + "-" + safeTestName;
+        outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + safeTestName;
+        outputTopicThree = outputTopicThree + "-" + safeTestName;
+        streamTwo = streamTwo + "-" + safeTestName;
         CLUSTER.createTopics(streamOne, streamConcurrent);
         CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
         CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
@@ -191,18 +193,18 @@ public class QueryableStateIntegrationTest {
     }
 
     @Rule
-    public TestName name = new TestName();
+    public TestName testName = new TestName();
 
     @Before
     public void before() throws Exception {
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "queryable-state-" + name.getMethodName();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
 
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("state-" + applicationId).getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
index 0ed2972..460448e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
@@ -53,9 +53,10 @@ import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -92,11 +93,11 @@ public class ResetPartitionTimeIntegrationTest {
     public String processingGuarantee;
 
     @Rule
-    public TestName name = new TestName();
+    public TestName testName = new TestName();
 
     @Test
     public void shouldPreservePartitionTimeOnKafkaStreamRestart() {
-        final String appId = name.getMethodName();
+        final String appId = "app-" + safeUniqueTestName(getClass(), testName);
         final String input = "input";
         final String outputRaw = "output-raw";
 
@@ -156,7 +157,7 @@ public class ResetPartitionTimeIntegrationTest {
             assertThat(lastRecordedTimestamp, is(5000L));
         } finally {
             kafkaStreams.close();
-            cleanStateAfterTest(CLUSTER, kafkaStreams);
+            quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 394eeed..af7dad0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -60,6 +60,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -113,7 +114,7 @@ public class RocksDBMetricsIntegrationTest {
     public String processingGuarantee;
 
     @Rule
-    public TestName name = new TestName();
+    public TestName testName = new TestName();
 
     @Before
     public void before() throws Exception {
@@ -154,10 +155,8 @@ public class RocksDBMetricsIntegrationTest {
 
     private Properties streamsConfig() {
         final Properties streamsConfiguration = new Properties();
-        final String suffix = name.getMethodName()
-            .replace('[', '_')
-            .replace(']', '_');
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-" + suffix);
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index 28b5dc5..3bf2d8f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -45,6 +45,8 @@ import org.junit.rules.TestName;
 import java.util.Properties;
 import java.util.function.Predicate;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
 @Category({IntegrationTest.class})
 public class StandbyTaskCreationIntegrationTest {
 
@@ -75,11 +77,11 @@ public class StandbyTaskCreationIntegrationTest {
     }
 
     private Properties streamsConfiguration() {
-        final String applicationId = "testApp" + testName.getMethodName();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index e65c0fc..8fa3913 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertTrue;
 
@@ -72,8 +73,8 @@ public class StandbyTaskEOSIntegrationTest {
     @Parameterized.Parameter
     public String eosConfig;
 
-    private final String appId = "eos-test-app";
-    private final String inputTopic = "input";
+    private String appId;
+    private String inputTopic;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@@ -83,6 +84,9 @@ public class StandbyTaskEOSIntegrationTest {
 
     @Before
     public void createTopics() throws Exception {
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        appId = "app-" + safeTestName;
+        inputTopic = "input-" + safeTestName;
         CLUSTER.deleteTopicsAndWait(inputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
         CLUSTER.createTopic(inputTopic, 1, 3);
     }
@@ -159,10 +163,7 @@ public class StandbyTaskEOSIntegrationTest {
 
     private Properties props(final String stateDirPath) {
         final Properties streamsConfiguration = new Properties();
-        final String suffix = testName.getMethodName()
-            .replace('[', '_')
-            .replace(']', '_');
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId + suffix);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 68195b5..6829177 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyQueryMetadata;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -38,7 +38,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -56,6 +55,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -318,13 +318,13 @@ public class StoreQueryIntegrationTest {
     }
 
     private Properties streamsConfiguration() {
-        final String applicationId = "streamsApp" + testName.getMethodName();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
-        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 9db1e6b..153f434 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -56,29 +56,31 @@ import java.util.Properties;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 
 @Category({IntegrationTest.class})
 public class StoreUpgradeIntegrationTest {
-    private static String inputStream;
     private static final String STORE_NAME = "store";
+    private String inputStream;
 
     private KafkaStreams kafkaStreams;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
 
+    @Rule
+    public TestName testName = new TestName();
+
     @Before
     public void createTopics() throws Exception {
-        inputStream = "input-stream-" + testName.getMethodName();
+        inputStream = "input-stream-" + safeUniqueTestName(getClass(), testName);
         CLUSTER.createTopic(inputStream);
     }
 
-    @Rule
-    public TestName testName = new TestName();
-
     private Properties props() {
         final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "addId-" + testName.getMethodName());
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index beb9ec7..93941d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -45,8 +45,10 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
@@ -56,7 +58,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -69,9 +70,10 @@ import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
 import static org.hamcrest.CoreMatchers.is;
@@ -81,12 +83,18 @@ import static org.hamcrest.Matchers.equalTo;
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
 public class SuppressionDurabilityIntegrationTest {
+    private static final Logger LOG = LoggerFactory.getLogger(SuppressionDurabilityIntegrationTest.class);
+
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
         3,
         mkProperties(mkMap()),
         0L
     );
+
+    @Rule
+    public TestName testName = new TestName();
+
     private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
     private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
     private static final Serde<String> STRING_SERDE = Serdes.String();
@@ -107,8 +115,8 @@ public class SuppressionDurabilityIntegrationTest {
 
     @Test
     public void shouldRecoverBufferAfterShutdown() {
-        final String testId = "-shouldRecoverBufferAfterShutdown";
-        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + testId;
         final String input = "input" + testId;
         final String storeName = "counts";
         final String outputSuppressed = "output-suppressed" + testId;
@@ -243,7 +251,7 @@ public class SuppressionDurabilityIntegrationTest {
 
         } finally {
             driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
+            quietlyCleanStateAfterTest(CLUSTER, driver);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 8c3be2f..9d7c23d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -66,8 +66,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
@@ -150,7 +150,7 @@ public class SuppressionIntegrationTest {
             assertThat(suppressedRecords, is(true));
         } finally {
             driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
+            quietlyCleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -203,7 +203,7 @@ public class SuppressionIntegrationTest {
             assertThat(suppressedRecords, is(true));
         } finally {
             driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
+            quietlyCleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -272,7 +272,7 @@ public class SuppressionIntegrationTest {
             verifyErrorShutdown(driver);
         } finally {
             driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
+            quietlyCleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -314,7 +314,7 @@ public class SuppressionIntegrationTest {
             verifyErrorShutdown(driver);
         } finally {
             driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
+            quietlyCleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -374,7 +374,7 @@ public class SuppressionIntegrationTest {
             assertThat(suppressedRecords, is(true));
         } finally {
             driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
+            quietlyCleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -430,7 +430,7 @@ public class SuppressionIntegrationTest {
             assertThat(suppressedRecords, is(true));
         } finally {
             driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
+            quietlyCleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -492,7 +492,7 @@ public class SuppressionIntegrationTest {
             assertThat(suppressedRecords, is(true));
         } finally {
             driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
+            quietlyCleanStateAfterTest(CLUSTER, driver);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 122c0e9..eb7fa4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -50,6 +50,9 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
 
 import java.io.File;
@@ -91,6 +94,7 @@ import static org.junit.Assert.fail;
 public class IntegrationTestUtils {
 
     public static final long DEFAULT_TIMEOUT = 60 * 1000L;
+    private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestUtils.class);
 
     /*
      * Records state transition for StreamThread
@@ -112,6 +116,19 @@ public class IntegrationTestUtils {
     }
 
     /**
+     * Gives a test name that is safe to be used in application ids, topic names, etc.
+     * The name is safe even for parameterized methods.
+     */
+    public static String safeUniqueTestName(final Class<?> testClass, final TestName testName) {
+        return (testClass.getSimpleName() + testName.getMethodName())
+                .replace('.', '_')
+                .replace('[', '_')
+                .replace(']', '_')
+                .replace(' ', '_')
+                .replace('=', '_');
+    }
+
+    /**
      * Removes local state stores. Useful to reset state in-between integration test runs.
      *
      * @param streamsConfiguration Streams configuration settings
@@ -145,12 +162,12 @@ public class IntegrationTestUtils {
         }
     }
 
-    public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) {
-        driver.cleanUp();
+    public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) {
         try {
+            driver.cleanUp();
             cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
-        } catch (final InterruptedException e) {
-            throw new RuntimeException(e);
+        } catch (final RuntimeException | InterruptedException e) {
+            LOG.warn("Ignoring failure to clean test state", e);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index 1a43a2f..ab84e05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -45,6 +45,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
@@ -239,7 +240,8 @@ public class KTableKTableForeignKeyJoinScenarioTest {
 
     private void validateTopologyCanProcessData(final StreamsBuilder builder) {
         final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + testName.getMethodName());
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + safeTestName);
         config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 50e3de7..afc3785 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -106,7 +106,9 @@ public final class StreamsTestUtils {
         kafkaStreams.start();
         assertThat(
             "KafkaStreams did not transit to RUNNING state within " + timeoutMs + " milli seconds.",
-            countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), equalTo(true));
+            countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS),
+            equalTo(true)
+        );
     }
 
     public static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> iterator) {