You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/04/29 22:12:17 UTC
[kafka] branch trunk updated: KAFKA-9875: Make integration tests
more resilient (#8578)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dc4d439 KAFKA-9875: Make integration tests more resilient (#8578)
dc4d439 is described below
commit dc4d439825b2d117707b01c7c64769e700246fc6
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Apr 29 17:11:49 2020 -0500
KAFKA-9875: Make integration tests more resilient (#8578)
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/streams/KafkaStreamsTest.java | 28 ++++++++++++----------
.../EOSUncleanShutdownIntegrationTest.java | 4 ++--
.../GlobalKTableEOSIntegrationTest.java | 16 +++++--------
.../integration/GlobalKTableIntegrationTest.java | 14 ++++++-----
.../integration/GlobalThreadShutDownOrderTest.java | 5 ++--
.../KStreamAggregationDedupIntegrationTest.java | 13 ++++++----
.../KStreamAggregationIntegrationTest.java | 20 +++++++++-------
.../KStreamRepartitionIntegrationTest.java | 18 +++++++-------
.../KTableKTableForeignKeyJoinIntegrationTest.java | 4 +++-
...reignKeyJoinMaterializationIntegrationTest.java | 4 +++-
.../integration/LagFetchIntegrationTest.java | 14 ++++++-----
.../integration/MetricsIntegrationTest.java | 8 +++----
.../OptimizedKTableIntegrationTest.java | 9 +++----
.../integration/QueryableStateIntegrationTest.java | 26 ++++++++++----------
.../ResetPartitionTimeIntegrationTest.java | 9 +++----
.../integration/RocksDBMetricsIntegrationTest.java | 9 ++++---
.../StandbyTaskCreationIntegrationTest.java | 8 ++++---
.../integration/StandbyTaskEOSIntegrationTest.java | 13 +++++-----
.../integration/StoreQueryIntegrationTest.java | 12 +++++-----
.../integration/StoreUpgradeIntegrationTest.java | 14 ++++++-----
.../SuppressionDurabilityIntegrationTest.java | 18 ++++++++++----
.../integration/SuppressionIntegrationTest.java | 16 ++++++-------
.../integration/utils/IntegrationTestUtils.java | 25 +++++++++++++++----
.../KTableKTableForeignKeyJoinScenarioTest.java | 4 +++-
.../org/apache/kafka/test/StreamsTestUtils.java | 4 +++-
25 files changed, 183 insertions(+), 132 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 05271e8..9c7c01f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -84,6 +84,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
@@ -848,8 +849,9 @@ public class KafkaStreamsTest {
@Test
public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
- final String inputTopic = testName.getMethodName() + "-input";
- final String outputTopic = testName.getMethodName() + "-output";
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String inputTopic = safeTestName + "-input";
+ final String outputTopic = safeTestName + "-output";
final Topology topology = new Topology();
topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
.addProcessor("process", () -> new AbstractProcessor<String, String>() {
@@ -866,22 +868,24 @@ public class KafkaStreamsTest {
@Test
public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception {
- final String inputTopic = testName.getMethodName() + "-input";
- final String outputTopic = testName.getMethodName() + "-output";
- final String globalTopicName = testName.getMethodName() + "-global";
- final String storeName = testName.getMethodName() + "-counts";
- final String globalStoreName = testName.getMethodName() + "-globalStore";
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String inputTopic = safeTestName + "-input";
+ final String outputTopic = safeTestName + "-output";
+ final String globalTopicName = safeTestName + "-global";
+ final String storeName = safeTestName + "-counts";
+ final String globalStoreName = safeTestName + "-globalStore";
final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, false);
startStreamsAndCheckDirExists(topology, false);
}
@Test
public void statefulTopologyShouldCreateStateDirectory() throws Exception {
- final String inputTopic = testName.getMethodName() + "-input";
- final String outputTopic = testName.getMethodName() + "-output";
- final String globalTopicName = testName.getMethodName() + "-global";
- final String storeName = testName.getMethodName() + "-counts";
- final String globalStoreName = testName.getMethodName() + "-globalStore";
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String inputTopic = safeTestName + "-input";
+ final String outputTopic = safeTestName + "-output";
+ final String globalTopicName = safeTestName + "-global";
+ final String storeName = safeTestName + "-counts";
+ final String globalStoreName = safeTestName + "-globalStore";
final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true);
startStreamsAndCheckDirExists(topology, true);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
index 128aef7..a186c57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
@@ -53,8 +53,8 @@ import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.junit.Assert.assertFalse;
@@ -158,7 +158,7 @@ public class EOSUncleanShutdownIntegrationTest {
// the state directory should still exist with the empty checkpoint file
assertFalse(stateDir.exists());
- cleanStateAfterTest(CLUSTER, driver);
+ quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 2850a51..0d5c1b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -58,6 +58,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertNotNull;
@RunWith(Parameterized.class)
@@ -108,10 +109,8 @@ public class GlobalKTableEOSIntegrationTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
- final String applicationId = "globalTable-eos-test-" + testName.getMethodName()
- .replace('[', '_')
- .replace(']', '_');
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@@ -291,12 +290,9 @@ public class GlobalKTableEOSIntegrationTest {
}
private void createTopics() throws Exception {
- final String suffix = testName.getMethodName()
- .replace('[', '_')
- .replace(']', '_');
- streamTopic = "stream-" + suffix;
- globalTableTopic = "globalTable-" + suffix;
- CLUSTER.deleteAllTopicsAndWait(300_000L);
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamTopic = "stream-" + safeTestName;
+ globalTableTopic = "globalTable-" + safeTestName;
CLUSTER.createTopics(streamTopic);
CLUSTER.createTopic(globalTableTopic, 2, 1);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 3e4c07f..ed44c17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
-import java.time.Duration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -39,8 +38,8 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
@@ -52,12 +51,14 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@@ -91,8 +92,8 @@ public class GlobalKTableIntegrationTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
- final String applicationId = "globalTableTopic-table-test-" + testName.getMethodName();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -300,8 +301,9 @@ public class GlobalKTableIntegrationTest {
}
private void createTopics() throws Exception {
- streamTopic = "stream-" + testName.getMethodName();
- globalTableTopic = "globalTable-" + testName.getMethodName();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamTopic = "stream-" + safeTestName;
+ globalTableTopic = "globalTable-" + safeTestName;
CLUSTER.createTopics(streamTopic);
CLUSTER.createTopic(globalTableTopic, 2, 1);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 6adc6f3..17db9fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertEquals;
@@ -101,8 +102,8 @@ public class GlobalThreadShutDownOrderTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
- final String applicationId = "global-thread-shutdown-test" + testName.getMethodName();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 912e497..dbe0b04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -57,6 +57,7 @@ import java.util.List;
import java.util.Properties;
import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
/**
* Similar to KStreamAggregationIntegrationTest but with dedupping enabled
@@ -88,8 +89,8 @@ public class KStreamAggregationDedupIntegrationTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
- final String applicationId = "kgrouped-stream-test-" + testName.getMethodName();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -222,8 +223,9 @@ public class KStreamAggregationDedupIntegrationTest {
private void createTopics() throws InterruptedException {
- streamOneInput = "stream-one-" + testName.getMethodName();
- outputTopic = "output-" + testName.getMethodName();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamOneInput = "stream-one-" + safeTestName;
+ outputTopic = "output-" + safeTestName;
CLUSTER.createTopic(streamOneInput, 3, 1);
CLUSTER.createTopic(outputTopic);
}
@@ -238,9 +240,10 @@ public class KStreamAggregationDedupIntegrationTest {
final Deserializer<V> valueDeserializer,
final List<KeyValueTimestamp<K, V>> expectedRecords)
throws InterruptedException {
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testName.getMethodName());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 1d5ab8e..e04e25e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -88,6 +88,7 @@ import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis;
import static java.time.Instant.ofEpochMilli;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -107,7 +108,7 @@ public class KStreamAggregationIntegrationTest {
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
private String streamOneInput;
- private String userSessionsStream = "user-sessions";
+ private String userSessionsStream;
private String outputTopic;
private KGroupedStream<String, String> groupedStream;
private Reducer<String> reducer;
@@ -123,8 +124,8 @@ public class KStreamAggregationIntegrationTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
- final String applicationId = "kgrouped-stream-test-" + testName.getMethodName();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -791,9 +792,10 @@ public class KStreamAggregationIntegrationTest {
private void createTopics() throws InterruptedException {
- streamOneInput = "stream-one-" + testName.getMethodName();
- outputTopic = "output-" + testName.getMethodName();
- userSessionsStream = userSessionsStream + "-" + testName.getMethodName();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamOneInput = "stream-one-" + safeTestName;
+ outputTopic = "output-" + safeTestName;
+ userSessionsStream = "user-sessions-" + safeTestName;
CLUSTER.createTopic(streamOneInput, 3, 1);
CLUSTER.createTopics(userSessionsStream, outputTopic);
}
@@ -814,9 +816,10 @@ public class KStreamAggregationIntegrationTest {
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) throws InterruptedException {
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testName.getMethodName());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
@@ -835,9 +838,10 @@ public class KStreamAggregationIntegrationTest {
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) throws InterruptedException {
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testName.getMethodName());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 2cd75d1..6bb1269 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -77,6 +77,7 @@ import java.util.regex.Pattern;
import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
import static org.apache.kafka.streams.KafkaStreams.State.REBALANCING;
import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -116,16 +117,12 @@ public class KStreamRepartitionIntegrationTest {
streamsConfiguration = new Properties();
kafkaStreamsInstances = new ArrayList<>();
- final String suffix = testName.getMethodName()
- .replace('[', '_')
- .replace(']', '_')
- .replace(' ', '_')
- .replace('=', '_');
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
- topicB = "topic-b-" + suffix;
- inputTopic = "input-topic-" + suffix;
- outputTopic = "output-topic-" + suffix;
- applicationId = "kstream-repartition-stream-test-" + suffix;
+ topicB = "topic-b-" + safeTestName;
+ inputTopic = "input-topic-" + safeTestName;
+ outputTopic = "output-topic-" + safeTestName;
+ applicationId = "app-" + safeTestName;
CLUSTER.createTopic(inputTopic, 4, 1);
CLUSTER.createTopic(outputTopic, 1, 1);
@@ -812,9 +809,10 @@ public class KStreamRepartitionIntegrationTest {
final Deserializer<V> valueSerializer,
final List<KeyValue<K, V>> expectedRecords) throws InterruptedException {
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-repartition-test-" + testName.getMethodName());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index bca7c79..34cf1c1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -54,6 +54,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -87,8 +88,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
@Before
public void before() {
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfig = mkProperties(mkMap(
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey-" + testName.getMethodName()),
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index 65b872b..11aa616 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -50,6 +50,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -75,8 +76,9 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
@Before
public void before() {
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
streamsConfig = mkProperties(mkMap(
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey-" + testName.getMethodName()),
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index f5143e1..2e6d6c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -66,6 +66,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@@ -88,16 +89,17 @@ public class LagFetchIntegrationTest {
private String stateStoreName;
@Rule
- public TestName name = new TestName();
+ public TestName testName = new TestName();
@Before
public void before() {
- inputTopicName = "input-topic-" + name.getMethodName();
- outputTopicName = "output-topic-" + name.getMethodName();
- stateStoreName = "lagfetch-test-store" + name.getMethodName();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ inputTopicName = "input-topic-" + safeTestName;
+ outputTopicName = "output-topic-" + safeTestName;
+ stateStoreName = "lagfetch-test-store" + safeTestName;
streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lag-fetch-" + name.getMethodName());
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
@@ -106,7 +108,7 @@ public class LagFetchIntegrationTest {
consumerConfiguration = new Properties();
consumerConfiguration.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, name.getMethodName() + "-consumer");
+ consumerConfiguration.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
consumerConfiguration.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfiguration.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfiguration.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index d15ab1b..21000c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -59,6 +59,7 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -72,8 +73,6 @@ public class MetricsIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final long timeout = 60000;
- private final static String APPLICATION_ID_VALUE = "stream-metrics-test";
-
// Metric group
private static final String STREAM_CLIENT_NODE_METRICS = "stream-metrics";
private static final String STREAM_THREAD_NODE_METRICS_0100_TO_24 = "stream-metrics";
@@ -227,14 +226,15 @@ public class MetricsIntegrationTest {
private String appId;
@Rule
- public TestName name = new TestName();
+ public TestName testName = new TestName();
@Before
public void before() throws InterruptedException {
builder = new StreamsBuilder();
CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
- appId = APPLICATION_ID_VALUE + "-" + name.getMethodName();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ appId = "app-" + safeTestName;
streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index d342631..092cffc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -71,7 +72,7 @@ public class OptimizedKTableIntegrationTest {
public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
@Rule
- public final TestName name = new TestName();
+ public final TestName testName = new TestName();
private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
private final MockTime mockTime = cluster.time;
@@ -215,13 +216,13 @@ public class OptimizedKTableIntegrationTest {
}
private Properties streamsConfiguration() {
- final String applicationId = "streamsApp";
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
- config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + name.getMethodName());
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
- config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+ config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index b3929f6..eb33107 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -98,6 +98,7 @@ import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
@@ -140,14 +141,15 @@ public class QueryableStateIntegrationTest {
private Comparator<KeyValue<String, Long>> stringLongComparator;
private void createTopics() throws Exception {
- streamOne = streamOne + "-" + name.getMethodName();
- streamConcurrent = streamConcurrent + "-" + name.getMethodName();
- streamThree = streamThree + "-" + name.getMethodName();
- outputTopic = outputTopic + "-" + name.getMethodName();
- outputTopicConcurrent = outputTopicConcurrent + "-" + name.getMethodName();
- outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + name.getMethodName();
- outputTopicThree = outputTopicThree + "-" + name.getMethodName();
- streamTwo = streamTwo + "-" + name.getMethodName();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamOne = streamOne + "-" + safeTestName;
+ streamConcurrent = streamConcurrent + "-" + safeTestName;
+ streamThree = streamThree + "-" + safeTestName;
+ outputTopic = outputTopic + "-" + safeTestName;
+ outputTopicConcurrent = outputTopicConcurrent + "-" + safeTestName;
+ outputTopicConcurrentWindowed = outputTopicConcurrentWindowed + "-" + safeTestName;
+ outputTopicThree = outputTopicThree + "-" + safeTestName;
+ streamTwo = streamTwo + "-" + safeTestName;
CLUSTER.createTopics(streamOne, streamConcurrent);
CLUSTER.createTopic(streamTwo, STREAM_TWO_PARTITIONS, NUM_REPLICAS);
CLUSTER.createTopic(streamThree, STREAM_THREE_PARTITIONS, 1);
@@ -191,18 +193,18 @@ public class QueryableStateIntegrationTest {
}
@Rule
- public TestName name = new TestName();
+ public TestName testName = new TestName();
@Before
public void before() throws Exception {
createTopics();
streamsConfiguration = new Properties();
- final String applicationId = "queryable-state-" + name.getMethodName();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("state-" + applicationId).getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
index 0ed2972..460448e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
@@ -53,9 +53,10 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -92,11 +93,11 @@ public class ResetPartitionTimeIntegrationTest {
public String processingGuarantee;
@Rule
- public TestName name = new TestName();
+ public TestName testName = new TestName();
@Test
public void shouldPreservePartitionTimeOnKafkaStreamRestart() {
- final String appId = name.getMethodName();
+ final String appId = "app-" + safeUniqueTestName(getClass(), testName);
final String input = "input";
final String outputRaw = "output-raw";
@@ -156,7 +157,7 @@ public class ResetPartitionTimeIntegrationTest {
assertThat(lastRecordedTimestamp, is(5000L));
} finally {
kafkaStreams.close();
- cleanStateAfterTest(CLUSTER, kafkaStreams);
+ quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 394eeed..af7dad0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -60,6 +60,7 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -113,7 +114,7 @@ public class RocksDBMetricsIntegrationTest {
public String processingGuarantee;
@Rule
- public TestName name = new TestName();
+ public TestName testName = new TestName();
@Before
public void before() throws Exception {
@@ -154,10 +155,8 @@ public class RocksDBMetricsIntegrationTest {
private Properties streamsConfig() {
final Properties streamsConfiguration = new Properties();
- final String suffix = name.getMethodName()
- .replace('[', '_')
- .replace(']', '_');
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-" + suffix);
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index 28b5dc5..3bf2d8f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -45,6 +45,8 @@ import org.junit.rules.TestName;
import java.util.Properties;
import java.util.function.Predicate;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
@Category({IntegrationTest.class})
public class StandbyTaskCreationIntegrationTest {
@@ -75,11 +77,11 @@ public class StandbyTaskCreationIntegrationTest {
}
private Properties streamsConfiguration() {
- final String applicationId = "testApp" + testName.getMethodName();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index e65c0fc..8fa3913 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertTrue;
@@ -72,8 +73,8 @@ public class StandbyTaskEOSIntegrationTest {
@Parameterized.Parameter
public String eosConfig;
- private final String appId = "eos-test-app";
- private final String inputTopic = "input";
+ private String appId;
+ private String inputTopic;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@@ -83,6 +84,9 @@ public class StandbyTaskEOSIntegrationTest {
@Before
public void createTopics() throws Exception {
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ appId = "app-" + safeTestName;
+ inputTopic = "input-" + safeTestName;
CLUSTER.deleteTopicsAndWait(inputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
CLUSTER.createTopic(inputTopic, 1, 3);
}
@@ -159,10 +163,7 @@ public class StandbyTaskEOSIntegrationTest {
private Properties props(final String stateDirPath) {
final Properties streamsConfiguration = new Properties();
- final String suffix = testName.getMethodName()
- .replace('[', '_')
- .replace(']', '_');
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId + suffix);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 68195b5..6829177 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
@@ -38,7 +38,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -56,6 +55,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -318,13 +318,13 @@ public class StoreQueryIntegrationTest {
}
private Properties streamsConfiguration() {
- final String applicationId = "streamsApp" + testName.getMethodName();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
- config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
- config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+ config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 9db1e6b..153f434 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -56,29 +56,31 @@ import java.util.Properties;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
@Category({IntegrationTest.class})
public class StoreUpgradeIntegrationTest {
- private static String inputStream;
private static final String STORE_NAME = "store";
+ private String inputStream;
private KafkaStreams kafkaStreams;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+ @Rule
+ public TestName testName = new TestName();
+
@Before
public void createTopics() throws Exception {
- inputStream = "input-stream-" + testName.getMethodName();
+ inputStream = "input-stream-" + safeUniqueTestName(getClass(), testName);
CLUSTER.createTopic(inputStream);
}
- @Rule
- public TestName testName = new TestName();
-
private Properties props() {
final Properties streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "addId-" + testName.getMethodName());
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index beb9ec7..93941d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -45,8 +45,10 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
@@ -56,7 +58,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -69,9 +70,10 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
import static org.hamcrest.CoreMatchers.is;
@@ -81,12 +83,18 @@ import static org.hamcrest.Matchers.equalTo;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class SuppressionDurabilityIntegrationTest {
+ private static final Logger LOG = LoggerFactory.getLogger(SuppressionDurabilityIntegrationTest.class);
+
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
3,
mkProperties(mkMap()),
0L
);
+
+ @Rule
+ public TestName testName = new TestName();
+
private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String();
@@ -107,8 +115,8 @@ public class SuppressionDurabilityIntegrationTest {
@Test
public void shouldRecoverBufferAfterShutdown() {
- final String testId = "-shouldRecoverBufferAfterShutdown";
- final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+ final String testId = safeUniqueTestName(getClass(), testName);
+ final String appId = "appId_" + testId;
final String input = "input" + testId;
final String storeName = "counts";
final String outputSuppressed = "output-suppressed" + testId;
@@ -243,7 +251,7 @@ public class SuppressionDurabilityIntegrationTest {
} finally {
driver.close();
- cleanStateAfterTest(CLUSTER, driver);
+ quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 8c3be2f..9d7c23d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -66,8 +66,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
@@ -150,7 +150,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
- cleanStateAfterTest(CLUSTER, driver);
+ quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@@ -203,7 +203,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
- cleanStateAfterTest(CLUSTER, driver);
+ quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@@ -272,7 +272,7 @@ public class SuppressionIntegrationTest {
verifyErrorShutdown(driver);
} finally {
driver.close();
- cleanStateAfterTest(CLUSTER, driver);
+ quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@@ -314,7 +314,7 @@ public class SuppressionIntegrationTest {
verifyErrorShutdown(driver);
} finally {
driver.close();
- cleanStateAfterTest(CLUSTER, driver);
+ quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@@ -374,7 +374,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
- cleanStateAfterTest(CLUSTER, driver);
+ quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@@ -430,7 +430,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
- cleanStateAfterTest(CLUSTER, driver);
+ quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
@@ -492,7 +492,7 @@ public class SuppressionIntegrationTest {
assertThat(suppressedRecords, is(true));
} finally {
driver.close();
- cleanStateAfterTest(CLUSTER, driver);
+ quietlyCleanStateAfterTest(CLUSTER, driver);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 122c0e9..eb7fa4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -50,6 +50,9 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Option;
import java.io.File;
@@ -91,6 +94,7 @@ import static org.junit.Assert.fail;
public class IntegrationTestUtils {
public static final long DEFAULT_TIMEOUT = 60 * 1000L;
+ private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestUtils.class);
/*
* Records state transition for StreamThread
@@ -112,6 +116,19 @@ public class IntegrationTestUtils {
}
/**
+ * Gives a test name that is safe to be used in application ids, topic names, etc.
+ * The name is safe even for parameterized methods.
+ */
+ public static String safeUniqueTestName(final Class<?> testClass, final TestName testName) {
+ return (testClass.getSimpleName() + testName.getMethodName())
+ .replace('.', '_')
+ .replace('[', '_')
+ .replace(']', '_')
+ .replace(' ', '_')
+ .replace('=', '_');
+ }
+
+ /**
* Removes local state stores. Useful to reset state in-between integration test runs.
*
* @param streamsConfiguration Streams configuration settings
@@ -145,12 +162,12 @@ public class IntegrationTestUtils {
}
}
- public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) {
- driver.cleanUp();
+ public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) {
try {
+ driver.cleanUp();
cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
- } catch (final InterruptedException e) {
- throw new RuntimeException(e);
+ } catch (final RuntimeException | InterruptedException e) {
+ LOG.warn("Ignoring failure to clean test state", e);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index 1a43a2f..ab84e05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -45,6 +45,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -239,7 +240,8 @@ public class KTableKTableForeignKeyJoinScenarioTest {
private void validateTopologyCanProcessData(final StreamsBuilder builder) {
final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + testName.getMethodName());
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + safeTestName);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 50e3de7..afc3785 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -106,7 +106,9 @@ public final class StreamsTestUtils {
kafkaStreams.start();
assertThat(
"KafkaStreams did not transit to RUNNING state within " + timeoutMs + " milli seconds.",
- countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), equalTo(true));
+ countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS),
+ equalTo(true)
+ );
}
public static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K, V>> iterator) {