You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/18 21:55:06 UTC

[kafka] branch trunk updated: MINOR: Improve on reset integration test (#4436)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70f25b9  MINOR: Improve on reset integration test (#4436)
70f25b9 is described below

commit 70f25b95192bb7d56c62c8b8bc80b478b8e08ef9
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jan 18 13:54:59 2018 -0800

    MINOR: Improve on reset integration test (#4436)
    
    * Parameterize abstract reset integration test
    
    Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
 .../integration/AbstractResetIntegrationTest.java  | 534 +++++++++------------
 .../streams/integration/ResetIntegrationTest.java  |  64 +--
 .../integration/ResetIntegrationWithSslTest.java   |  62 ++-
 3 files changed, 275 insertions(+), 385 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 26673ca..5819b6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -21,11 +21,13 @@ import kafka.tools.StreamsResetter;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KafkaStreams;
@@ -40,271 +42,319 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import org.junit.AfterClass;
 import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-abstract class AbstractResetIntegrationTest {
-    private final static Logger log = LoggerFactory.getLogger(AbstractResetIntegrationTest.class);
+@Category({IntegrationTest.class})
+public abstract class AbstractResetIntegrationTest {
+    static String testId;
+    static EmbeddedKafkaCluster cluster;
+    static Map<String, Object> sslConfig = null;
+    private static KafkaStreams streams;
+    private static MockTime mockTime;
+    private static AdminClient adminClient = null;
+    private static KafkaAdminClient kafkaAdminClient = null;
+
+    @AfterClass
+    public static void afterClassCleanup() {
+        if (adminClient != null) {
+            adminClient.close();
+            adminClient = null;
+        }
+        if (kafkaAdminClient != null) {
+            kafkaAdminClient.close(10, TimeUnit.SECONDS);
+            kafkaAdminClient = null;
+        }
+    }
+
+    private String appID;
+    private Properties commonClientConfig;
+
+    private void prepareEnvironment() {
+        if (adminClient == null) {
+            adminClient = AdminClient.create(commonClientConfig);
+        }
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient =  (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
+        }
+
+        // we align time to seconds to get clean window boundaries and thus ensure the same result for each run
+        // otherwise, input records could fall into different windows for different runs depending on the initial mock time
+        final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000;
+        mockTime = cluster.time;
+        mockTime.setCurrentTimeMs(alignedTime);
+    }
+
+    private void prepareConfigs() {
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
 
-    static final int NUM_BROKERS = 1;
+        if (sslConfig != null) {
+            commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+            commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
+            commonClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        }
+
+        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
+        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        PRODUCER_CONFIG.putAll(commonClientConfig);
+
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, testId + "-result-consumer");
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        RESULT_CONSUMER_CONFIG.putAll(commonClientConfig);
+
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
+        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+        STREAMS_CONFIG.putAll(commonClientConfig);
+    }
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
 
-    private static final String APP_ID = "cleanup-integration-test";
     private static final String INPUT_TOPIC = "inputTopic";
     private static final String OUTPUT_TOPIC = "outputTopic";
     private static final String OUTPUT_TOPIC_2 = "outputTopic2";
     private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
     private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
-    private static final String NON_EXISTING_TOPIC = "nonExistingTopic2";
+    private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
 
     private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
     private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
     private static final int TIMEOUT_MULTIPLIER = 5;
 
-    private static AdminClient adminClient = null;
-    private static KafkaAdminClient kafkaAdminClient = null;
-    private static int testNo = 0;
-
-    static EmbeddedKafkaCluster cluster;
-    static String bootstrapServers;
-    static MockTime mockTime;
-
-    private final AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed consumerGroupInactive = new AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed();
-
-    private class WaitUntilConsumerGroupGotClosed implements TestCondition {
+    private final TestCondition consumerGroupInactiveCondition = new TestCondition() {
         @Override
         public boolean conditionMet() {
-            return adminClient.describeConsumerGroup(APP_ID, 0).consumers().get().isEmpty();
+            return adminClient.describeConsumerGroup(testId + "-result-consumer", 0).consumers().get().isEmpty();
         }
+    };
+
+    private static final Properties STREAMS_CONFIG = new Properties();
+    private final static Properties PRODUCER_CONFIG = new Properties();
+    private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
+
+    void prepareTest() throws Exception {
+        cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        prepareConfigs();
+        prepareEnvironment();
+
+        add10InputElements();
     }
 
-    static void afterClassGlobalCleanup() {
-        if (adminClient != null) {
-            adminClient.close();
-            adminClient = null;
+    void cleanupTest() throws Exception {
+        if (streams != null) {
+            streams.close(30, TimeUnit.SECONDS);
         }
+        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+    }
 
-        if (kafkaAdminClient != null) {
-            kafkaAdminClient.close(10, TimeUnit.SECONDS);
-            kafkaAdminClient = null;
+    private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException {
+        List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"),
+                KeyValue.pair(1L, "bbb"),
+                KeyValue.pair(0L, "ccc"),
+                KeyValue.pair(1L, "ddd"),
+                KeyValue.pair(0L, "eee"),
+                KeyValue.pair(1L, "fff"),
+                KeyValue.pair(0L, "ggg"),
+                KeyValue.pair(1L, "hhh"),
+                KeyValue.pair(0L, "iii"),
+                KeyValue.pair(1L, "jjj"));
+
+        for (KeyValue<Long, String> record : records) {
+            mockTime.sleep(10);
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), PRODUCER_CONFIG, mockTime.milliseconds());
         }
     }
 
-    void beforePrepareTest() throws Exception {
-        ++testNo;
-        mockTime = cluster.time;
-        bootstrapServers = cluster.bootstrapServers();
-
-        // we align time to seconds to get clean window boundaries and thus ensure the same result for each run
-        // otherwise, input records could fall into different windows for different runs depending on the initial mock time
-        final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000;
-        mockTime.setCurrentTimeMs(alignedTime);
+    void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
+        appID = testId + "-not-reset-during-runtime";
+        final String[] parameters = new String[] {
+            "--application-id", appID,
+            "--bootstrap-servers", cluster.bootstrapServers(),
+            "--input-topics", NON_EXISTING_TOPIC };
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
 
-        Properties sslConfig = getClientSslConfig();
-        if (sslConfig == null) {
-            sslConfig = new Properties();
-            sslConfig.put("bootstrap.servers", bootstrapServers);
-        }
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
-        if (adminClient == null) {
-            adminClient = AdminClient.create(sslConfig);
-        }
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
+        streams.start();
 
-        if (kafkaAdminClient == null) {
-            kafkaAdminClient =  (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(sslConfig);
-        }
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
 
-        // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
-        while (true) {
-            Thread.sleep(50);
+        streams.close();
+    }
 
-            try {
-                TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-                    "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-            } catch (final TimeoutException e) {
-                continue;
-            }
-            break;
-        }
+    public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
+        appID = testId + "-not-reset-without-input-topic";
+        final String[] parameters = new String[] {
+            "--application-id", appID,
+            "--bootstrap-servers", cluster.bootstrapServers(),
+            "--input-topics", NON_EXISTING_TOPIC };
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
 
-        prepareInputData();
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
     }
 
-    Properties getClientSslConfig() {
-        return null;
+    public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception {
+        appID = testId + "-not-reset-without-intermediate-topic";
+        final String[] parameters = new String[] {
+            "--application-id", appID,
+            "--bootstrap-servers", cluster.bootstrapServers(),
+            "--input-topics", NON_EXISTING_TOPIC };
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
     }
 
     void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
-        final Properties sslConfig = getClientSslConfig();
-        final Properties streamsConfiguration = prepareTest();
-
-        final Properties resultTopicConsumerConfig = new Properties();
-        if (sslConfig != null) {
-            resultTopicConsumerConfig.putAll(sslConfig);
-        }
-        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
-            bootstrapServers,
-            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
-            LongDeserializer.class,
-            LongDeserializer.class));
+        appID = testId + "-from-scratch";
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
         streams.start();
-        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
             "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
-        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
         streams.cleanUp();
-        cleanGlobal(sslConfig, false, null, null);
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        cleanGlobal(false, null, null);
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(null);
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
 
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-        cleanGlobal(sslConfig, false, null, null);
+        cleanGlobal(false, null, null);
     }
 
     void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
         cluster.createTopic(INTERMEDIATE_USER_TOPIC);
 
-        final Properties sslConfig = getClientSslConfig();
-        final Properties streamsConfiguration = prepareTest();
-
-        final Properties resultTopicConsumerConfig = new Properties();
-        if (sslConfig != null) {
-            resultTopicConsumerConfig.putAll(sslConfig);
-        }
-        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
-            bootstrapServers,
-            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
-            LongDeserializer.class,
-            LongDeserializer.class));
+        appID = testId + "-from-scratch-with-intermediate-topic";
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), STREAMS_CONFIG);
         streams.start();
-        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
         // receive only first values to make sure intermediate user topic is not consumed completely
         // => required to test "seekToEnd" for intermediate topics
-        final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC_2,
-            40
-        );
+        final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2, 40);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
             "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // insert bad record to make sure intermediate user topic gets seekToEnd()
         mockTime.sleep(1);
-        Properties producerConfig = sslConfig;
-        if (producerConfig == null) {
-            producerConfig = new Properties();
-        }
-        producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class));
+        KeyValue<Long, String> badMessage = new KeyValue<>(-1L, "badRecord-ShouldBeSkipped");
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             INTERMEDIATE_USER_TOPIC,
-            Collections.singleton(new KeyValue<>(-1L, "badRecord-ShouldBeSkipped")),
-            producerConfig,
+            Collections.singleton(badMessage),
+            PRODUCER_CONFIG,
             mockTime.milliseconds());
 
         // RESET
-        streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), STREAMS_CONFIG);
         streams.cleanUp();
-        cleanGlobal(sslConfig, true, null, null);
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        cleanGlobal(true, null, null);
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
-        final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC_2_RERUN,
-            40);
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
+        final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2_RERUN, 40);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
         assertThat(resultRerun2, equalTo(result2));
 
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        final Properties props = TestUtils.consumerConfig(cluster.bootstrapServers(), testId + "-result-consumer", LongDeserializer.class, StringDeserializer.class, commonClientConfig);
+        final List<KeyValue<Long, String>> resultIntermediate = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props, INTERMEDIATE_USER_TOPIC, 21);
+
+        for (int i = 0; i < 10; i++) {
+            assertThat(resultIntermediate.get(i), equalTo(resultIntermediate.get(i + 11)));
+        }
+        assertThat(resultIntermediate.get(10), equalTo(badMessage));
+
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-        cleanGlobal(sslConfig, true, null, null);
+        cleanGlobal(true, null, null);
 
         cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
     }
 
     void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
-        final Properties sslConfig = getClientSslConfig();
-        final Properties streamsConfiguration = prepareTest();
-
-        final Properties resultTopicConsumerConfig = new Properties();
-        if (sslConfig != null) {
-            resultTopicConsumerConfig.putAll(sslConfig);
-        }
-        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
-            bootstrapServers,
-            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
-            LongDeserializer.class,
-            LongDeserializer.class));
+        appID = testId + "-from-file";
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
         streams.start();
-        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
             "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
@@ -314,11 +364,11 @@ abstract class AbstractResetIntegrationTest {
             writer.close();
         }
 
-        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
         streams.cleanUp();
 
-        cleanGlobal(sslConfig, false, "--from-file", resetFile.getAbsolutePath());
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        cleanGlobal(false, "--from-file", resetFile.getAbsolutePath());
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(null);
@@ -327,44 +377,28 @@ abstract class AbstractResetIntegrationTest {
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            5);
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 5);
         streams.close();
 
         result.remove(0);
         assertThat(resultRerun, equalTo(result));
 
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-        cleanGlobal(sslConfig, false, null, null);
+        cleanGlobal(false, null, null);
     }
 
     void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
-        final Properties sslConfig = getClientSslConfig();
-        final Properties streamsConfiguration = prepareTest();
-
-        final Properties resultTopicConsumerConfig = new Properties();
-        if (sslConfig != null) {
-            resultTopicConsumerConfig.putAll(sslConfig);
-        }
-        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
-            bootstrapServers,
-            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
-            LongDeserializer.class,
-            LongDeserializer.class));
+        appID = testId + "-from-datetime";
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
         streams.start();
-        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
             "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
@@ -374,7 +408,7 @@ abstract class AbstractResetIntegrationTest {
             writer.close();
         }
 
-        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
         streams.cleanUp();
 
 
@@ -382,8 +416,8 @@ abstract class AbstractResetIntegrationTest {
         final Calendar calendar = Calendar.getInstance();
         calendar.add(Calendar.DATE, -1);
 
-        cleanGlobal(sslConfig, false, "--to-datetime", format.format(calendar.getTime()));
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()));
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(null);
@@ -392,43 +426,27 @@ abstract class AbstractResetIntegrationTest {
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
 
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-        cleanGlobal(sslConfig, false, null, null);
+        cleanGlobal(false, null, null);
     }
 
     void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
-        final Properties sslConfig = getClientSslConfig();
-        final Properties streamsConfiguration = prepareTest();
-
-        final Properties resultTopicConsumerConfig = new Properties();
-        if (sslConfig != null) {
-            resultTopicConsumerConfig.putAll(sslConfig);
-        }
-        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
-            bootstrapServers,
-            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
-            LongDeserializer.class,
-            LongDeserializer.class));
+        appID = testId + "-from-duration";
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
-        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
         streams.start();
-        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
+        final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
             "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
@@ -438,11 +456,11 @@ abstract class AbstractResetIntegrationTest {
             writer.close();
         }
 
-        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG);
         streams.cleanUp();
-        cleanGlobal(sslConfig, false, "--by-duration", "PT1M");
+        cleanGlobal(false, "--by-duration", "PT1M");
 
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(null);
@@ -451,74 +469,14 @@ abstract class AbstractResetIntegrationTest {
 
         // RE-RUN
         streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
+        final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));
 
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+        TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
             "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-        cleanGlobal(sslConfig, false, null, null);
-    }
-
-    private Properties prepareTest() throws IOException {
-        Properties streamsConfiguration = getClientSslConfig();
-        if (streamsConfiguration == null) {
-            streamsConfiguration = new Properties();
-        }
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
-        streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-
-        return streamsConfiguration;
-    }
-
-    private void prepareInputData() throws Exception {
-        cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
-
-        add10InputElements();
-    }
-
-    private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException {
-        Properties producerConfig = getClientSslConfig();
-        if (producerConfig == null) {
-            producerConfig = new Properties();
-        }
-        producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class));
-
-        mockTime.sleep(10);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds());
-        mockTime.sleep(10);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds());
-        mockTime.sleep(10);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds());
-        mockTime.sleep(10);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds());
-        mockTime.sleep(10);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds());
-        mockTime.sleep(10);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds());
-        mockTime.sleep(1);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds());
-        mockTime.sleep(1);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds());
-        mockTime.sleep(1);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds());
-        mockTime.sleep(1);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
+        cleanGlobal(false, null, null);
     }
 
     private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic2) {
@@ -570,14 +528,13 @@ abstract class AbstractResetIntegrationTest {
         return builder.build();
     }
 
-    private void cleanGlobal(final Properties sslConfig,
-                             final boolean withIntermediateTopics,
+    private void cleanGlobal(final boolean withIntermediateTopics,
                              final String resetScenario,
                              final String resetScenarioArg) throws Exception {
         // leaving --zookeeper arg here to ensure tool works if users add it
         final List<String> parameterList = new ArrayList<>(
-            Arrays.asList("--application-id", APP_ID + testNo,
-                "--bootstrap-servers", bootstrapServers,
+            Arrays.asList("--application-id", appID,
+                "--bootstrap-servers", cluster.bootstrapServers(),
                 "--input-topics", INPUT_TOPIC));
         if (withIntermediateTopics) {
             parameterList.add("--intermediate-topics");
@@ -588,7 +545,7 @@ abstract class AbstractResetIntegrationTest {
             final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile));
             writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n");
             writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n");
-            writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n");
+            writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value() + "\n");
             writer.close();
 
             parameterList.add("--config-file");
@@ -607,36 +564,10 @@ abstract class AbstractResetIntegrationTest {
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
 
-        log.info("Calling StreamsResetter with parameters {} and configs {}", parameters, cleanUpConfig);
-
         final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
         Assert.assertEquals(0, exitCode);
     }
 
-    void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
-
-        final Properties streamsConfiguration = prepareTest();
-        final List<String> parameterList = new ArrayList<>(
-                Arrays.asList("--application-id", APP_ID + testNo,
-                        "--bootstrap-servers", bootstrapServers,
-                        "--input-topics", NON_EXISTING_TOPIC));
-
-        final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
-        final Properties cleanUpConfig = new Properties();
-        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
-        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
-
-        // RUN
-        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
-        streams.start();
-
-        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
-        Assert.assertEquals(1, exitCode);
-
-        streams.close();
-
-    }
-
     private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception {
         // do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm
         if (intermediateUserTopic != null) {
@@ -645,5 +576,4 @@ abstract class AbstractResetIntegrationTest {
             cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME);
         }
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index ef9a67d..6c0cc5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -18,20 +18,15 @@ package org.apache.kafka.streams.integration;
 
 import kafka.server.KafkaConfig$;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.test.IntegrationTest;
-import org.junit.AfterClass;
-import org.junit.Assert;
+
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import kafka.tools.StreamsResetter;
 
 import java.util.Properties;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.List;
 
 
 /**
@@ -42,31 +37,29 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER;
-    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
-    private static final String APP_ID = "Integration-test";
-    private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
-    private static int testNo = 1;
+
+    private static final String TEST_ID = "reset-integration-test";
 
     static {
-        final Properties props = new Properties();
+        final Properties brokerProps = new Properties();
         // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
         // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially
         // very long sleep times
-        props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
-        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
-        cluster = CLUSTER;
-    }
-
-    @AfterClass
-    public static void globalCleanup() {
-        afterClassGlobalCleanup();
+        brokerProps.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
     }
 
     @Before
     public void before() throws Exception {
-        beforePrepareTest();
+        testId = TEST_ID;
+        cluster = CLUSTER;
+        prepareTest();
     }
 
+    @After
+    public void after() throws Exception {
+        cleanupTest();
+    }
 
     @Test
     public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
@@ -100,36 +93,11 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
 
     @Test
     public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
-
-        final List<String> parameterList = new ArrayList<>(
-                Arrays.asList("--application-id", APP_ID + testNo,
-                        "--bootstrap-servers", bootstrapServers,
-                        "--input-topics", NON_EXISTING_TOPIC));
-
-        final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
-        final Properties cleanUpConfig = new Properties();
-        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
-        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
-
-        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
-        Assert.assertEquals(1, exitCode);
+        super.shouldNotAllowToResetWhenInputTopicAbsent();
     }
 
     @Test
     public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception {
-
-        final List<String> parameterList = new ArrayList<>(
-                Arrays.asList("--application-id", APP_ID + testNo,
-                        "--bootstrap-servers", bootstrapServers,
-                        "--intermediate-topics", NON_EXISTING_TOPIC));
-
-        final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
-        final Properties cleanUpConfig = new Properties();
-        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
-        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
-
-        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
-        Assert.assertEquals(1, exitCode);
+        super.shouldNotAllowToResetWhenIntermediateTopicAbsent();
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
index abf4c38..cfdcfb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
@@ -17,21 +17,17 @@
 package org.apache.kafka.streams.integration;
 
 import kafka.server.KafkaConfig$;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -40,54 +36,50 @@ import java.util.Properties;
 @Category({IntegrationTest.class})
 public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
 
-    private static Map<String, Object> sslConfig;
-    static {
-        try {
-            sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER;
 
+    private static final String TEST_ID = "reset-with-ssl-integration-test";
+
     static {
-        final Properties props = new Properties();
+        final Properties brokerProps = new Properties();
         // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
         // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially
         // very long sleep times
-        props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
-        props.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
-        props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL");
-        props.putAll(sslConfig);
-        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
-        cluster = CLUSTER;
-    }
+        brokerProps.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
+
+        try {
+            sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
+
+            brokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
+            brokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL");
+            brokerProps.putAll(sslConfig);
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
 
-    @AfterClass
-    public static void globalCleanup() {
-        afterClassGlobalCleanup();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
     }
 
     @Before
     public void before() throws Exception {
-        beforePrepareTest();
+        testId = TEST_ID;
+        cluster = CLUSTER;
+        prepareTest();
     }
 
-    Properties getClientSslConfig() {
-        final Properties props = new Properties();
-
-        props.put("bootstrap.servers", CLUSTER.bootstrapServers());
-        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
-        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
-
-        return props;
+    @After
+    public void after() throws Exception {
+        cleanupTest();
     }
 
     @Test
     public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception {
         super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
     }
+
+    @Test
+    public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
+        super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].