You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/07/31 16:30:19 UTC

[kafka] branch 2.5 updated: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938) (#8993)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 2601c67  KAFKA-10173: Use SmokeTest for upgrade system tests (#8938) (#8993)
2601c67 is described below

commit 2601c67203e50e5ac066c5e62e9e6e51faef6e07
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Jul 31 11:28:49 2020 -0500

    KAFKA-10173: Use SmokeTest for upgrade system tests (#8938) (#8993)
    
    Replaces the previous upgrade test's trivial Streams app
    with the commonly used SmokeTest, exercising many more
    features. Also adjust the test matrix to test upgrading
    from each released version since 2.0 to the current branch.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 build.gradle                                       |  12 +
 gradle/dependencies.gradle                         |   2 +
 settings.gradle                                    |   1 +
 .../SmokeTestDriverIntegrationTest.java            |  13 +-
 .../kafka/streams/tests/SmokeTestClient.java       | 130 +++--
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  73 +--
 .../kafka/streams/tests/SmokeTestClient.java       | 156 ++---
 .../kafka/streams/tests/SmokeTestDriver.java       | 632 +++++++++++++++++++++
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  70 +--
 .../kafka/streams/tests/StreamsSmokeTest.java      |  99 ++++
 .../kafka/streams/tests/SmokeTestClient.java       | 126 ++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 632 +++++++++++++++++++++
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  70 +--
 .../kafka/streams/tests/StreamsSmokeTest.java      |  99 ++++
 .../kafka/streams/tests/SmokeTestClient.java       | 126 ++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 632 +++++++++++++++++++++
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  70 +--
 .../kafka/streams/tests/StreamsSmokeTest.java      |  99 ++++
 .../kafka/streams/tests/SmokeTestClient.java       | 126 ++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 622 ++++++++++++++++++++
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  63 +-
 .../kafka/streams/tests/StreamsSmokeTest.java      |  99 ++++
 .../kafka/streams/tests/SmokeTestClient.java       | 126 ++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 622 ++++++++++++++++++++
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  63 +-
 .../kafka/streams/tests/StreamsSmokeTest.java      |  99 ++++
 .../kafka/streams/tests/SmokeTestClient.java       | 126 ++--
 .../kafka/streams/tests/SmokeTestDriver.java       | 622 ++++++++++++++++++++
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  63 +-
 .../kafka/streams/tests/StreamsSmokeTest.java      |  99 ++++
 tests/docker/Dockerfile                            |   2 +
 tests/kafkatest/services/streams.py                |  77 ++-
 .../streams/streams_application_upgrade_test.py    | 297 ++++++++++
 .../tests/streams/streams_broker_bounce_test.py    |   2 +-
 tests/kafkatest/version.py                         |   4 +
 vagrant/base.sh                                    |   2 +
 36 files changed, 5531 insertions(+), 625 deletions(-)

diff --git a/build.gradle b/build.gradle
index 2274fec..2760592 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1508,6 +1508,18 @@ project(':streams:upgrade-system-tests-24') {
   }
 }
 
+project(':streams:upgrade-system-tests-25') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-25"
+
+  dependencies {
+    testCompile libs.kafkaStreams_25
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
 project(':jmh-benchmarks') {
 
   apply plugin: 'com.github.johnrengelman.shadow'
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index c44f370..abb2c04 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -95,6 +95,7 @@ versions += [
   kafka_22: "2.2.2",
   kafka_23: "2.3.1",
   kafka_24: "2.4.0",
+  kafka_25: "2.5.0",
   lz4: "1.7.1",
   mavenArtifact: "3.6.3",
   metrics: "2.2.0",
@@ -165,6 +166,7 @@ libs += [
   kafkaStreams_22: "org.apache.kafka:kafka-streams:$versions.kafka_22",
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
+  kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
   log4j: "log4j:log4j:$versions.log4j",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
diff --git a/settings.gradle b/settings.gradle
index e9f04a2..25fda5b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -42,4 +42,5 @@ include 'clients',
     'streams:upgrade-system-tests-22',
     'streams:upgrade-system-tests-23',
     'streams:upgrade-system-tests-24',
+    'streams:upgrade-system-tests-25',
     'tools'
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 63e55b3..0f52b3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -92,6 +94,13 @@ public class SmokeTestDriverIntegrationTest {
 
         final Properties props = new Properties();
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+        props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
 
         // cycle out Streams instances as long as the test is running.
         while (driver.isAlive()) {
@@ -103,10 +112,6 @@ public class SmokeTestDriverIntegrationTest {
             clients.add(smokeTestClient);
             smokeTestClient.start(props);
 
-            while (!clients.get(clients.size() - 1).started()) {
-                Thread.sleep(100);
-            }
-
             // let the oldest client die of "natural causes"
             if (clients.size() >= 3) {
                 final SmokeTestClient client = clients.remove(0);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index db243fd..91d14af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,11 +16,10 @@
  */
 package org.apache.kafka.streams.tests;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -38,11 +37,15 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
@@ -50,19 +53,42 @@ public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
@@ -70,17 +96,42 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
@@ -88,17 +139,14 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close in time.");
         }
     }
 
@@ -106,39 +154,11 @@ public class SmokeTestClient extends SmokeTestUtil {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props) {
-        final Topology build = getTopology();
-        final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
-        streamsClient.setStateListener((newState, oldState) -> {
-            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
-            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
-                started = true;
-            }
-
-            if (newState == KafkaStreams.State.NOT_RUNNING) {
-                closed = true;
-            }
-        });
-        streamsClient.setUncaughtExceptionHandler((t, e) -> {
-            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-            streamsClient.close(Duration.ofSeconds(30));
-        });
-
-        return streamsClient;
-    }
-
     public Topology getTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 90e6ccd..6a7cb89 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -27,11 +27,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.time.Instant;
 
 public class SmokeTestUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(SmokeTestUtil.class);
 
     final static int END = Integer.MAX_VALUE;
 
@@ -45,21 +47,48 @@ public class SmokeTestUtil {
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
+                    private long smallestOffset = Long.MAX_VALUE;
+                    private long largestOffset = Long.MIN_VALUE;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         super.init(context);
+                        LOG.info("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        System.out.flush();
                         numRecordsProcessed = 0;
+                        smallestOffset = Long.MAX_VALUE;
+                        largestOffset = Long.MIN_VALUE;
                     }
 
                     @Override
                     public void process(final Object key, final Object value) {
                         numRecordsProcessed++;
-                        if (numRecordsProcessed % 100 == 0) {
-                            System.out.printf("%s: %s%n", name, Instant.now());
-                            System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
+                        LOG.info("processed " + numRecordsProcessed + " records from topic=" + topic);
+                        System.out.printf("%s: %s%n", name, Instant.now());
+                        System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
+
+                        if (smallestOffset > context().offset()) {
+                            smallestOffset = context().offset();
+                        }
+                        if (largestOffset < context().offset()) {
+                            largestOffset = context().offset();
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        LOG.info("Close processor for task {}", context().taskId());
+                        System.out.printf("Close processor for task %s%n", context().taskId());
+                        System.out.println("processed " + numRecordsProcessed + " records");
+                        final long processed;
+                        if (largestOffset >= smallestOffset) {
+                            processed = 1L + largestOffset - smallestOffset;
+                        } else {
+                            processed = 0L;
                         }
+                        System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
+                        System.out.flush();
                     }
                 };
             }
@@ -76,39 +105,19 @@ public class SmokeTestUtil {
     public static class Agg {
 
         KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
-            return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
-                @Override
-                public KeyValue<String, Long> apply(final String key, final Long value) {
-                    return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
-                }
-            };
+            return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L);
         }
 
         public Initializer<Long> init() {
-            return new Initializer<Long>() {
-                @Override
-                public Long apply() {
-                    return 0L;
-                }
-            };
+            return () -> 0L;
         }
 
         Aggregator<String, Long, Long> adder() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate + value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate + value;
         }
 
         Aggregator<String, Long, Long> remover() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate - value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate - value;
         }
     }
 
@@ -120,14 +129,6 @@ public class SmokeTestUtil {
 
     static Serde<Double> doubleSerde = Serdes.Double();
 
-    static File createDir(final File parent, final String child) {
-        final File dir = new File(parent, child);
-
-        dir.mkdir();
-
-        return dir;
-    }
-
     public static void sleep(final long duration) {
         try {
             Thread.sleep(duration);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
similarity index 73%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
copy to streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index db243fd..9a89978 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,48 +16,77 @@
  */
 package org.apache.kafka.streams.tests;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
-import java.time.Duration;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Instant;
 import java.util.Properties;
-
-import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
+    public static final int ONE_DAY = 24 * 60 * 60 * 1000;
+    public static final long TWO_DAYS = 2L * ONE_DAY;
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
     private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
+
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
+    }
+
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
 
     public SmokeTestClient(final String name) {
-        super();
         this.name = name;
     }
 
@@ -70,35 +99,54 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                started = true;
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(30, TimeUnit.SECONDS);
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
-    }
-
-    public void closeAsync() {
-        streams.close(Duration.ZERO);
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean closed = streams.close(1, TimeUnit.MINUTES);
+
+        if (closed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (closed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
@@ -106,39 +154,11 @@ public class SmokeTestClient extends SmokeTestUtil {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props) {
-        final Topology build = getTopology();
-        final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
-        streamsClient.setStateListener((newState, oldState) -> {
-            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
-            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
-                started = true;
-            }
-
-            if (newState == KafkaStreams.State.NOT_RUNNING) {
-                closed = true;
-            }
-        });
-        streamsClient.setUncaughtExceptionHandler((t, e) -> {
-            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-            streamsClient.close(Duration.ofSeconds(30));
-        });
-
-        return streamsClient;
-    }
-
     public Topology getTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
@@ -149,34 +169,30 @@ public class SmokeTestClient extends SmokeTestUtil {
         data.process(SmokeTestUtil.printProcessorSupplier("data", name));
 
         // min
-        final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
+        final KGroupedStream<String, Integer> groupedData = data.groupByKey(Serialized.with(stringSerde, intSerde));
 
         final KTable<Windowed<String>, Integer> minAggregation = groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(1)).grace(Duration.ofMinutes(1)))
+            .windowedBy(TimeWindows.of(ONE_DAY))
             .aggregate(
                 () -> Integer.MAX_VALUE,
                 (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
                 Materialized
                     .<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min")
                     .withValueSerde(intSerde)
-                    .withRetention(Duration.ofHours(25))
             );
 
         streamify(minAggregation, "min-raw");
 
-        streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), "min-suppressed");
-
         minAggregation
             .toStream(new Unwindow<>())
             .filterNot((k, v) -> k.equals("flush"))
             .to("min", Produced.with(stringSerde, intSerde));
 
         final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
-            .windowedBy(TimeWindows.of(Duration.ofSeconds(2)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(30)))
+            .windowedBy(TimeWindows.of(2_000L).advanceBy(1_000L))
             .reduce((l, r) -> l + r);
 
         streamify(smallWindowSum, "sws-raw");
-        streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed");
 
         final KTable<String, Integer> minTable = builder.table(
             "min",
@@ -187,7 +203,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // max
         groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(2)))
+            .windowedBy(TimeWindows.of(TWO_DAYS))
             .aggregate(
                 () -> Integer.MIN_VALUE,
                 (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
@@ -204,7 +220,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // sum
         groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(2)))
+            .windowedBy(TimeWindows.of(TWO_DAYS))
             .aggregate(
                 () -> 0L,
                 (aggKey, value, aggregate) -> (long) value + aggregate,
@@ -219,7 +235,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // cnt
         groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(2)))
+            .windowedBy(TimeWindows.of(TWO_DAYS))
             .count(Materialized.as("uwin-cnt"))
             .toStream(new Unwindow<>())
             .filterNot((k, v) -> k.equals("flush"))
@@ -251,7 +267,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // test repartition
         final Agg agg = new Agg();
-        cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde))
+        cntTable.groupBy(agg.selector(), Serialized.with(stringSerde, longSerde))
                 .aggregate(agg.init(), agg.adder(), agg.remover(),
                            Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
                                .withKeySerde(Serdes.String())
diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
new file mode 100644
index 0000000..d0a7d22
--- /dev/null
+++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+    private static final String[] TOPICS = {
+        "data",
+        "echo",
+        "max",
+        "min", "min-suppressed", "min-raw",
+        "dif",
+        "sum",
+        "sws-raw", "sws-suppressed",
+        "cnt",
+        "avg",
+        "tagg"
+    };
+
+    private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+
+    private static class ValueList {
+        public final String key;
+        private final int[] values;
+        private int index;
+
+        ValueList(final int min, final int max) {
+            key = min + "-" + max;
+
+            values = new int[max - min + 1];
+            for (int i = 0; i < values.length; i++) {
+                values[i] = min + i;
+            }
+            // We want to randomize the order of data to test not completely predictable processing order
+            // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
+            // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
+            shuffle(values, 10);
+
+            index = 0;
+        }
+
+        int next() {
+            return (index < values.length) ? values[index++] : -1;
+        }
+    }
+
+    public static String[] topics() {
+        return Arrays.copyOf(TOPICS, TOPICS.length);
+    }
+
+    static void generatePerpetually(final String kafka,
+                                    final int numKeys,
+                                    final int maxRecordsPerKey) {
+        final Properties producerProps = generatorProperties(kafka);
+
+        int numRecordsProduced = 0;
+
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+        }
+
+        final Random rand = new Random();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (true) {
+                final int index = rand.nextInt(numKeys);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(
+                        "data",
+                        stringSerde.serializer().serialize("", key),
+                        intSerde.serializer().serialize("", value)
+                    );
+
+                producer.send(record);
+
+                numRecordsProduced++;
+                if (numRecordsProduced % 100 == 0) {
+                    System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                }
+                Utils.sleep(2);
+            }
+        }
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final Duration timeToSpend) {
+        final Properties producerProps = generatorProperties(kafka);
+
+
+        int numRecordsProduced = 0;
+
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+            allData.put(data[i].key, new HashSet<>());
+        }
+        final Random rand = new Random();
+
+        int remaining = data.length;
+
+        final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
+
+        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (remaining > 0) {
+                final int index = rand.nextInt(remaining);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                if (value < 0) {
+                    remaining--;
+                    data[index] = data[remaining];
+                } else {
+
+                    final ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>(
+                            "data",
+                            stringSerde.serializer().serialize("", key),
+                            intSerde.serializer().serialize("", value)
+                        );
+
+                    producer.send(record, new TestCallback(record, needRetry));
+
+                    numRecordsProduced++;
+                    allData.get(key).add(value);
+                    if (numRecordsProduced % 100 == 0) {
+                        System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                    }
+                    Utils.sleep(Math.max(recordPauseTime, 2));
+                }
+            }
+            producer.flush();
+
+            int remainingRetries = 5;
+            while (!needRetry.isEmpty()) {
+                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
+                    producer.send(record, new TestCallback(record, needRetry2));
+                }
+                producer.flush();
+                needRetry = needRetry2;
+
+                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                    System.err.println("Failed to produce all records after multiple retries");
+                    Exit.exit(1);
+                }
+            }
+
+            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+            // all suppressed records.
+            final List<PartitionInfo> partitions = producer.partitionsFor("data");
+            for (final PartitionInfo partition : partitions) {
+                producer.send(new ProducerRecord<>(
+                    partition.topic(),
+                    partition.partition(),
+                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                    stringSerde.serializer().serialize("", "flush"),
+                    intSerde.serializer().serialize("", 0)
+                ));
+            }
+        }
+        return Collections.unmodifiableMap(allData);
+    }
+
+    private static Properties generatorProperties(final String kafka) {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        return producerProps;
+    }
+
+    private static class TestCallback implements Callback {
+        private final ProducerRecord<byte[], byte[]> originalRecord;
+        private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+        TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+                     final List<ProducerRecord<byte[], byte[]>> needRetry) {
+            this.originalRecord = originalRecord;
+            this.needRetry = needRetry;
+        }
+
+        @Override
+        public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+            if (exception != null) {
+                if (exception instanceof TimeoutException) {
+                    needRetry.add(originalRecord);
+                } else {
+                    exception.printStackTrace();
+                    Exit.exit(1);
+                }
+            }
+        }
+    }
+
+    private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
+        final Random rand = new Random();
+        for (int i = 0; i < data.length; i++) {
+            // we shuffle data within windowSize
+            final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
+
+            // swap
+            final int tmp = data[i];
+            data[i] = data[j];
+            data[j] = tmp;
+        }
+    }
+
+    public static class NumberDeserializer implements Deserializer<Number> {
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+        }
+
+        @Override
+        public Number deserialize(final String topic, final byte[] data) {
+            final Number value;
+            switch (topic) {
+                case "data":
+                case "echo":
+                case "min":
+                case "min-raw":
+                case "min-suppressed":
+                case "sws-raw":
+                case "sws-suppressed":
+                case "max":
+                case "dif":
+                    value = intSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "sum":
+                case "cnt":
+                case "tagg":
+                    value = longSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "avg":
+                    value = doubleSerde.deserializer().deserialize(topic, data);
+                    break;
+                default:
+                    throw new RuntimeException("unknown topic: " + topic);
+            }
+            return value;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+    public static VerificationResult verify(final String kafka,
+                                            final Map<String, Set<Integer>> inputs,
+                                            final int maxRecordsPerKey) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+        final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+
+        final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+        int recordsProcessed = 0;
+        final Map<String, AtomicInteger> processed =
+            Stream.of(TOPICS)
+                  .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
+
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
+
+        VerificationResult verificationResult = new VerificationResult(false, "no results yet");
+        int retry = 0;
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
+            final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
+            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
+                verificationResult = verifyAll(inputs, events, false);
+                if (verificationResult.passed()) {
+                    break;
+                } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                    System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
+                    break;
+                } else {
+                    System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
+                }
+            } else {
+                System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
+
+                retry = 0;
+                for (final ConsumerRecord<String, Number> record : records) {
+                    final String key = record.key();
+
+                    final String topic = record.topic();
+                    processed.get(topic).incrementAndGet();
+
+                    if (topic.equals("echo")) {
+                        recordsProcessed++;
+                        if (recordsProcessed % 100 == 0) {
+                            System.out.println("Echo records processed = " + recordsProcessed);
+                        }
+                    }
+
+                    events.computeIfAbsent(topic, t -> new HashMap<>())
+                          .computeIfAbsent(key, k -> new LinkedList<>())
+                          .add(record);
+                }
+
+                System.out.println(processed);
+            }
+        }
+        consumer.close();
+        final long finished = System.currentTimeMillis() - start;
+        System.out.println("Verification time=" + finished);
+        System.out.println("-------------------");
+        System.out.println("Result Verification");
+        System.out.println("-------------------");
+        System.out.println("recordGenerated=" + recordsGenerated);
+        System.out.println("recordProcessed=" + recordsProcessed);
+
+        if (recordsProcessed > recordsGenerated) {
+            System.out.println("PROCESSED-MORE-THAN-GENERATED");
+        } else if (recordsProcessed < recordsGenerated) {
+            System.out.println("PROCESSED-LESS-THAN-GENERATED");
+        }
+
+        boolean success;
+
+        final Map<String, Set<Number>> received =
+            events.get("echo")
+                  .entrySet()
+                  .stream()
+                  .map(entry -> mkEntry(
+                      entry.getKey(),
+                      entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
+                  )
+                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        success = inputs.equals(received);
+
+        if (success) {
+            System.out.println("ALL-RECORDS-DELIVERED");
+        } else {
+            int missedCount = 0;
+            for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
+                missedCount += received.get(entry.getKey()).size();
+            }
+            System.out.println("missedRecords=" + missedCount);
+        }
+
+        // give it one more try if it's not already passing.
+        if (!verificationResult.passed()) {
+            verificationResult = verifyAll(inputs, events, true);
+        }
+        success &= verificationResult.passed();
+
+        System.out.println(verificationResult.result());
+
+        System.out.println(success ? "SUCCESS" : "FAILURE");
+        return verificationResult;
+    }
+
+    public static class VerificationResult {
+        private final boolean passed;
+        private final String result;
+
+        VerificationResult(final boolean passed, final String result) {
+            this.passed = passed;
+            this.result = result;
+        }
+
+        public boolean passed() {
+            return passed;
+        }
+
+        public String result() {
+            return result;
+        }
+    }
+
+    private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
+                                                final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                                final boolean printResults) {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        boolean pass;
+        try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
+            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
+            pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
+            pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
+                final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
+                return getMin(unwindowedKey);
+            }, printResults);
+            pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
+            pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
+            pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
+            pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults);
+            pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
+            pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
+            pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults);
+        }
+        return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
+    }
+
+    private static boolean verify(final PrintStream resultStream,
+                                  final String topic,
+                                  final Map<String, Set<Integer>> inputData,
+                                  final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                  final Function<String, Number> keyToExpectation,
+                                  final boolean printResults) {
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
+        if (outputEvents.isEmpty()) {
+            resultStream.println(topic + " is empty");
+            return false;
+        } else {
+            resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
+
+            if (outputEvents.size() != inputData.size()) {
+                resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
+                                    outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
+                return false;
+            }
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
+                final String key = entry.getKey();
+                final Number expected = keyToExpectation.apply(key);
+                final Number actual = entry.getValue().getLast().value();
+                if (!expected.equals(actual)) {
+                    resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
+
+                    if (printResults) {
+                        resultStream.printf("\t inputEvents=%n%s%n\t" +
+                                "echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
+                            indent("\t\t", observedInputEvents.get(key)),
+                            indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>())));
+
+                        if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic))
+                            resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue()));
+                    }
+
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+
+    private static boolean verifySuppressed(final PrintStream resultStream,
+                                            @SuppressWarnings("SameParameterValue") final String topic,
+                                            final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                            final boolean printResults) {
+        resultStream.println("verifying suppressed " + topic);
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap());
+        for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) {
+            if (entry.getValue().size() != 1) {
+                final String unsuppressedTopic = topic.replace("-suppressed", "-raw");
+                final String key = entry.getKey();
+                final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
+                resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
+                                    key,
+                                    indent("\t\t", entry.getValue()));
+
+                if (printResults)
+                    resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
+                        indent("\t\t", events.get(unsuppressedTopic).get(key)),
+                        indent("\t\t", events.get("data").get(unwindowedKey)));
+
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
+                                 final Iterable<ConsumerRecord<String, Number>> list) {
+        final StringBuilder stringBuilder = new StringBuilder();
+        for (final ConsumerRecord<String, Number> record : list) {
+            stringBuilder.append(prefix).append(record).append('\n');
+        }
+        return stringBuilder.toString();
+    }
+
+    private static Long getSum(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) * (max - min + 1L) / 2L;
+    }
+
+    private static Double getAvg(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) / 2.0;
+    }
+
+
+    private static boolean verifyTAgg(final PrintStream resultStream,
+                                      final Map<String, Set<Integer>> allData,
+                                      final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+                                      final boolean printResults) {
+        if (taggEvents == null) {
+            resultStream.println("tagg is missing");
+            return false;
+        } else if (taggEvents.isEmpty()) {
+            resultStream.println("tagg is empty");
+            return false;
+        } else {
+            resultStream.println("verifying tagg");
+
+            // generate expected answer
+            final Map<String, Long> expected = new HashMap<>();
+            for (final String key : allData.keySet()) {
+                final int min = getMin(key).intValue();
+                final int max = getMax(key).intValue();
+                final String cnt = Long.toString(max - min + 1L);
+
+                expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
+            }
+
+            // check the result
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) {
+                final String key = entry.getKey();
+                Long expectedCount = expected.remove(key);
+                if (expectedCount == null) {
+                    expectedCount = 0L;
+                }
+
+                if (entry.getValue().getLast().value().longValue() != expectedCount) {
+                    resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
+
+                    if (printResults)
+                        resultStream.println("\t taggEvents: " + entry.getValue());
+                    return false;
+                }
+            }
+
+        }
+        return true;
+    }
+
+    private static Number getMin(final String key) {
+        return Integer.parseInt(key.split("-")[0]);
+    }
+
+    private static Number getMax(final String key) {
+        return Integer.parseInt(key.split("-")[1]);
+    }
+
+    private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
+        final List<TopicPartition> partitions = new ArrayList<>();
+
+        for (final String topic : topics) {
+            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
similarity index 65%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
copy to streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 90e6ccd..2dd2577 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -27,12 +27,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.time.Instant;
 
 public class SmokeTestUtil {
-
+    private static final Logger LOG = LoggerFactory.getLogger(SmokeTestUtil.class);
     final static int END = Integer.MAX_VALUE;
 
     static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
@@ -45,21 +46,50 @@ public class SmokeTestUtil {
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
+                    private long smallestOffset = Long.MAX_VALUE;
+                    private long largestOffset = Long.MIN_VALUE;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         super.init(context);
+                        LOG.info("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        System.out.flush();
                         numRecordsProcessed = 0;
+                        smallestOffset = Long.MAX_VALUE;
+                        largestOffset = Long.MIN_VALUE;
                     }
 
                     @Override
                     public void process(final Object key, final Object value) {
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
+                            LOG.info("processed " + numRecordsProcessed + " records from topic=" + topic);
                             System.out.printf("%s: %s%n", name, Instant.now());
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                         }
+
+                        if (smallestOffset > context().offset()) {
+                            smallestOffset = context().offset();
+                        }
+                        if (largestOffset < context().offset()) {
+                            largestOffset = context().offset();
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        LOG.info("Close processor for task {}", context().taskId());
+                        System.out.printf("Close processor for task %s%n", context().taskId());
+                        System.out.println("processed " + numRecordsProcessed + " records");
+                        final long processed;
+                        if (largestOffset >= smallestOffset) {
+                            processed = 1L + largestOffset - smallestOffset;
+                        } else {
+                            processed = 0L;
+                        }
+                        System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
+                        System.out.flush();
                     }
                 };
             }
@@ -76,39 +106,19 @@ public class SmokeTestUtil {
     public static class Agg {
 
         KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
-            return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
-                @Override
-                public KeyValue<String, Long> apply(final String key, final Long value) {
-                    return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
-                }
-            };
+            return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L);
         }
 
         public Initializer<Long> init() {
-            return new Initializer<Long>() {
-                @Override
-                public Long apply() {
-                    return 0L;
-                }
-            };
+            return () -> 0L;
         }
 
         Aggregator<String, Long, Long> adder() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate + value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate + value;
         }
 
         Aggregator<String, Long, Long> remover() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate - value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate - value;
         }
     }
 
@@ -120,14 +130,6 @@ public class SmokeTestUtil {
 
     static Serde<Double> doubleSerde = Serdes.Double();
 
-    static File createDir(final File parent, final String child) {
-        final File dir = new File(parent, child);
-
-        dir.mkdir();
-
-        return dir;
-    }
-
     public static void sleep(final long duration) {
         try {
             Thread.sleep(duration);
diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
new file mode 100644
index 0000000..07c7d5d
--- /dev/null
+++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {
+
+    /**
+     *  args ::= kafka propFileName command disableAutoTerminate
+     *  command := "run" | "process"
+     *
+     * @param args
+     */
+    public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
+        final boolean disableAutoTerminate = args.length > 2;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
+        if ("process".equals(command)) {
+            if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+                !StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)) {
+
+                System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
+                    StreamsConfig.EXACTLY_ONCE);
+
+                System.exit(1);
+            }
+        }
+
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
+        System.out.println("command=" + command);
+        System.out.println("props=" + streamsProperties);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
+
+        switch (command) {
+            case "run":
+                // this starts the driver (data generation and result verification)
+                final int numKeys = 10;
+                final int maxRecordsPerKey = 500;
+                if (disableAutoTerminate) {
+                    generatePerpetually(kafka, numKeys, maxRecordsPerKey);
+                } else {
+                    // slow down data production to span 30 seconds so that system tests have time to
+                    // do their bounces, etc.
+                    final Map<String, Set<Integer>> allData =
+                        generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30));
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
+                break;
+            case "process":
+                // this starts the stream processing app
+                new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
similarity index 79%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
copy to streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index db243fd..ced1369 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,11 +16,10 @@
  */
 package org.apache.kafka.streams.tests;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -38,11 +37,15 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
@@ -50,14 +53,42 @@ public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
     private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
+
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
+    }
+
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
 
     public SmokeTestClient(final String name) {
-        super();
         this.name = name;
     }
 
@@ -70,17 +101,43 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                started = true;
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
@@ -88,17 +145,14 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean closed = streams.close(Duration.ofMinutes(1));
+
+        if (closed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (closed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
@@ -106,39 +160,11 @@ public class SmokeTestClient extends SmokeTestUtil {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props) {
-        final Topology build = getTopology();
-        final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
-        streamsClient.setStateListener((newState, oldState) -> {
-            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
-            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
-                started = true;
-            }
-
-            if (newState == KafkaStreams.State.NOT_RUNNING) {
-                closed = true;
-            }
-        });
-        streamsClient.setUncaughtExceptionHandler((t, e) -> {
-            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-            streamsClient.close(Duration.ofSeconds(30));
-        });
-
-        return streamsClient;
-    }
-
     public Topology getTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
new file mode 100644
index 0000000..d0a7d22
--- /dev/null
+++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+    private static final String[] TOPICS = {
+        "data",
+        "echo",
+        "max",
+        "min", "min-suppressed", "min-raw",
+        "dif",
+        "sum",
+        "sws-raw", "sws-suppressed",
+        "cnt",
+        "avg",
+        "tagg"
+    };
+
+    private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+
+    private static class ValueList {
+        public final String key;
+        private final int[] values;
+        private int index;
+
+        ValueList(final int min, final int max) {
+            key = min + "-" + max;
+
+            values = new int[max - min + 1];
+            for (int i = 0; i < values.length; i++) {
+                values[i] = min + i;
+            }
+            // We want to randomize the order of data to test not completely predictable processing order
+            // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
+            // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
+            shuffle(values, 10);
+
+            index = 0;
+        }
+
+        int next() {
+            return (index < values.length) ? values[index++] : -1;
+        }
+    }
+
+    public static String[] topics() {
+        return Arrays.copyOf(TOPICS, TOPICS.length);
+    }
+
+    static void generatePerpetually(final String kafka,
+                                    final int numKeys,
+                                    final int maxRecordsPerKey) {
+        final Properties producerProps = generatorProperties(kafka);
+
+        int numRecordsProduced = 0;
+
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+        }
+
+        final Random rand = new Random();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (true) {
+                final int index = rand.nextInt(numKeys);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(
+                        "data",
+                        stringSerde.serializer().serialize("", key),
+                        intSerde.serializer().serialize("", value)
+                    );
+
+                producer.send(record);
+
+                numRecordsProduced++;
+                if (numRecordsProduced % 100 == 0) {
+                    System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                }
+                Utils.sleep(2);
+            }
+        }
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final Duration timeToSpend) {
+        final Properties producerProps = generatorProperties(kafka);
+
+
+        int numRecordsProduced = 0;
+
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+            allData.put(data[i].key, new HashSet<>());
+        }
+        final Random rand = new Random();
+
+        int remaining = data.length;
+
+        final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
+
+        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (remaining > 0) {
+                final int index = rand.nextInt(remaining);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                if (value < 0) {
+                    remaining--;
+                    data[index] = data[remaining];
+                } else {
+
+                    final ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>(
+                            "data",
+                            stringSerde.serializer().serialize("", key),
+                            intSerde.serializer().serialize("", value)
+                        );
+
+                    producer.send(record, new TestCallback(record, needRetry));
+
+                    numRecordsProduced++;
+                    allData.get(key).add(value);
+                    if (numRecordsProduced % 100 == 0) {
+                        System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                    }
+                    Utils.sleep(Math.max(recordPauseTime, 2));
+                }
+            }
+            producer.flush();
+
+            int remainingRetries = 5;
+            while (!needRetry.isEmpty()) {
+                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
+                    producer.send(record, new TestCallback(record, needRetry2));
+                }
+                producer.flush();
+                needRetry = needRetry2;
+
+                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                    System.err.println("Failed to produce all records after multiple retries");
+                    Exit.exit(1);
+                }
+            }
+
+            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+            // all suppressed records.
+            final List<PartitionInfo> partitions = producer.partitionsFor("data");
+            for (final PartitionInfo partition : partitions) {
+                producer.send(new ProducerRecord<>(
+                    partition.topic(),
+                    partition.partition(),
+                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                    stringSerde.serializer().serialize("", "flush"),
+                    intSerde.serializer().serialize("", 0)
+                ));
+            }
+        }
+        return Collections.unmodifiableMap(allData);
+    }
+
+    private static Properties generatorProperties(final String kafka) {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        return producerProps;
+    }
+
+    private static class TestCallback implements Callback {
+        private final ProducerRecord<byte[], byte[]> originalRecord;
+        private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+        TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+                     final List<ProducerRecord<byte[], byte[]>> needRetry) {
+            this.originalRecord = originalRecord;
+            this.needRetry = needRetry;
+        }
+
+        @Override
+        public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+            if (exception != null) {
+                if (exception instanceof TimeoutException) {
+                    needRetry.add(originalRecord);
+                } else {
+                    exception.printStackTrace();
+                    Exit.exit(1);
+                }
+            }
+        }
+    }
+
+    private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
+        final Random rand = new Random();
+        for (int i = 0; i < data.length; i++) {
+            // we shuffle data within windowSize
+            final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
+
+            // swap
+            final int tmp = data[i];
+            data[i] = data[j];
+            data[j] = tmp;
+        }
+    }
+
+    public static class NumberDeserializer implements Deserializer<Number> {
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+        }
+
+        @Override
+        public Number deserialize(final String topic, final byte[] data) {
+            final Number value;
+            switch (topic) {
+                case "data":
+                case "echo":
+                case "min":
+                case "min-raw":
+                case "min-suppressed":
+                case "sws-raw":
+                case "sws-suppressed":
+                case "max":
+                case "dif":
+                    value = intSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "sum":
+                case "cnt":
+                case "tagg":
+                    value = longSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "avg":
+                    value = doubleSerde.deserializer().deserialize(topic, data);
+                    break;
+                default:
+                    throw new RuntimeException("unknown topic: " + topic);
+            }
+            return value;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+    public static VerificationResult verify(final String kafka,
+                                            final Map<String, Set<Integer>> inputs,
+                                            final int maxRecordsPerKey) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+        final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+
+        final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+        int recordsProcessed = 0;
+        final Map<String, AtomicInteger> processed =
+            Stream.of(TOPICS)
+                  .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
+
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
+
+        VerificationResult verificationResult = new VerificationResult(false, "no results yet");
+        int retry = 0;
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
+            final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
+            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
+                verificationResult = verifyAll(inputs, events, false);
+                if (verificationResult.passed()) {
+                    break;
+                } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                    System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
+                    break;
+                } else {
+                    System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
+                }
+            } else {
+                System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
+
+                retry = 0;
+                for (final ConsumerRecord<String, Number> record : records) {
+                    final String key = record.key();
+
+                    final String topic = record.topic();
+                    processed.get(topic).incrementAndGet();
+
+                    if (topic.equals("echo")) {
+                        recordsProcessed++;
+                        if (recordsProcessed % 100 == 0) {
+                            System.out.println("Echo records processed = " + recordsProcessed);
+                        }
+                    }
+
+                    events.computeIfAbsent(topic, t -> new HashMap<>())
+                          .computeIfAbsent(key, k -> new LinkedList<>())
+                          .add(record);
+                }
+
+                System.out.println(processed);
+            }
+        }
+        consumer.close();
+        final long finished = System.currentTimeMillis() - start;
+        System.out.println("Verification time=" + finished);
+        System.out.println("-------------------");
+        System.out.println("Result Verification");
+        System.out.println("-------------------");
+        System.out.println("recordGenerated=" + recordsGenerated);
+        System.out.println("recordProcessed=" + recordsProcessed);
+
+        if (recordsProcessed > recordsGenerated) {
+            System.out.println("PROCESSED-MORE-THAN-GENERATED");
+        } else if (recordsProcessed < recordsGenerated) {
+            System.out.println("PROCESSED-LESS-THAN-GENERATED");
+        }
+
+        boolean success;
+
+        final Map<String, Set<Number>> received =
+            events.get("echo")
+                  .entrySet()
+                  .stream()
+                  .map(entry -> mkEntry(
+                      entry.getKey(),
+                      entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
+                  )
+                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        success = inputs.equals(received);
+
+        if (success) {
+            System.out.println("ALL-RECORDS-DELIVERED");
+        } else {
+            int missedCount = 0;
+            for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
+                missedCount += received.get(entry.getKey()).size();
+            }
+            System.out.println("missedRecords=" + missedCount);
+        }
+
+        // give it one more try if it's not already passing.
+        if (!verificationResult.passed()) {
+            verificationResult = verifyAll(inputs, events, true);
+        }
+        success &= verificationResult.passed();
+
+        System.out.println(verificationResult.result());
+
+        System.out.println(success ? "SUCCESS" : "FAILURE");
+        return verificationResult;
+    }
+
+    public static class VerificationResult {
+        private final boolean passed;
+        private final String result;
+
+        VerificationResult(final boolean passed, final String result) {
+            this.passed = passed;
+            this.result = result;
+        }
+
+        public boolean passed() {
+            return passed;
+        }
+
+        public String result() {
+            return result;
+        }
+    }
+
+    private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
+                                                final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                                final boolean printResults) {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        boolean pass;
+        try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
+            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
+            pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
+            pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
+                final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
+                return getMin(unwindowedKey);
+            }, printResults);
+            pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
+            pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
+            pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
+            pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults);
+            pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
+            pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
+            pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults);
+        }
+        return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
+    }
+
+    private static boolean verify(final PrintStream resultStream,
+                                  final String topic,
+                                  final Map<String, Set<Integer>> inputData,
+                                  final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                  final Function<String, Number> keyToExpectation,
+                                  final boolean printResults) {
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
+        if (outputEvents.isEmpty()) {
+            resultStream.println(topic + " is empty");
+            return false;
+        } else {
+            resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
+
+            if (outputEvents.size() != inputData.size()) {
+                resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
+                                    outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
+                return false;
+            }
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
+                final String key = entry.getKey();
+                final Number expected = keyToExpectation.apply(key);
+                final Number actual = entry.getValue().getLast().value();
+                if (!expected.equals(actual)) {
+                    resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
+
+                    if (printResults) {
+                        resultStream.printf("\t inputEvents=%n%s%n\t" +
+                                "echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
+                            indent("\t\t", observedInputEvents.get(key)),
+                            indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>())));
+
+                        if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic))
+                            resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue()));
+                    }
+
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+
+    private static boolean verifySuppressed(final PrintStream resultStream,
+                                            @SuppressWarnings("SameParameterValue") final String topic,
+                                            final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                            final boolean printResults) {
+        resultStream.println("verifying suppressed " + topic);
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap());
+        for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) {
+            if (entry.getValue().size() != 1) {
+                final String unsuppressedTopic = topic.replace("-suppressed", "-raw");
+                final String key = entry.getKey();
+                final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
+                resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
+                                    key,
+                                    indent("\t\t", entry.getValue()));
+
+                if (printResults)
+                    resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
+                        indent("\t\t", events.get(unsuppressedTopic).get(key)),
+                        indent("\t\t", events.get("data").get(unwindowedKey)));
+
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
+                                 final Iterable<ConsumerRecord<String, Number>> list) {
+        final StringBuilder stringBuilder = new StringBuilder();
+        for (final ConsumerRecord<String, Number> record : list) {
+            stringBuilder.append(prefix).append(record).append('\n');
+        }
+        return stringBuilder.toString();
+    }
+
+    private static Long getSum(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) * (max - min + 1L) / 2L;
+    }
+
+    private static Double getAvg(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) / 2.0;
+    }
+
+
+    private static boolean verifyTAgg(final PrintStream resultStream,
+                                      final Map<String, Set<Integer>> allData,
+                                      final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+                                      final boolean printResults) {
+        if (taggEvents == null) {
+            resultStream.println("tagg is missing");
+            return false;
+        } else if (taggEvents.isEmpty()) {
+            resultStream.println("tagg is empty");
+            return false;
+        } else {
+            resultStream.println("verifying tagg");
+
+            // generate expected answer
+            final Map<String, Long> expected = new HashMap<>();
+            for (final String key : allData.keySet()) {
+                final int min = getMin(key).intValue();
+                final int max = getMax(key).intValue();
+                final String cnt = Long.toString(max - min + 1L);
+
+                expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
+            }
+
+            // check the result
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) {
+                final String key = entry.getKey();
+                Long expectedCount = expected.remove(key);
+                if (expectedCount == null) {
+                    expectedCount = 0L;
+                }
+
+                if (entry.getValue().getLast().value().longValue() != expectedCount) {
+                    resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
+
+                    if (printResults)
+                        resultStream.println("\t taggEvents: " + entry.getValue());
+                    return false;
+                }
+            }
+
+        }
+        return true;
+    }
+
+    private static Number getMin(final String key) {
+        return Integer.parseInt(key.split("-")[0]);
+    }
+
+    private static Number getMax(final String key) {
+        return Integer.parseInt(key.split("-")[1]);
+    }
+
+    private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
+        final List<TopicPartition> partitions = new ArrayList<>();
+
+        for (final String topic : topics) {
+            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
similarity index 65%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
copy to streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 90e6ccd..2dd2577 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -27,12 +27,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.time.Instant;
 
 public class SmokeTestUtil {
-
+    private static final Logger LOG = LoggerFactory.getLogger(SmokeTestUtil.class);
     final static int END = Integer.MAX_VALUE;
 
     static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
@@ -45,21 +46,50 @@ public class SmokeTestUtil {
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
+                    private long smallestOffset = Long.MAX_VALUE;
+                    private long largestOffset = Long.MIN_VALUE;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         super.init(context);
+                        LOG.info("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        System.out.flush();
                         numRecordsProcessed = 0;
+                        smallestOffset = Long.MAX_VALUE;
+                        largestOffset = Long.MIN_VALUE;
                     }
 
                     @Override
                     public void process(final Object key, final Object value) {
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
+                            LOG.info("processed " + numRecordsProcessed + " records from topic=" + topic);
                             System.out.printf("%s: %s%n", name, Instant.now());
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                         }
+
+                        if (smallestOffset > context().offset()) {
+                            smallestOffset = context().offset();
+                        }
+                        if (largestOffset < context().offset()) {
+                            largestOffset = context().offset();
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        LOG.info("Close processor for task {}", context().taskId());
+                        System.out.printf("Close processor for task %s%n", context().taskId());
+                        System.out.println("processed " + numRecordsProcessed + " records");
+                        final long processed;
+                        if (largestOffset >= smallestOffset) {
+                            processed = 1L + largestOffset - smallestOffset;
+                        } else {
+                            processed = 0L;
+                        }
+                        System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
+                        System.out.flush();
                     }
                 };
             }
@@ -76,39 +106,19 @@ public class SmokeTestUtil {
     public static class Agg {
 
         KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
-            return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
-                @Override
-                public KeyValue<String, Long> apply(final String key, final Long value) {
-                    return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
-                }
-            };
+            return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L);
         }
 
         public Initializer<Long> init() {
-            return new Initializer<Long>() {
-                @Override
-                public Long apply() {
-                    return 0L;
-                }
-            };
+            return () -> 0L;
         }
 
         Aggregator<String, Long, Long> adder() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate + value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate + value;
         }
 
         Aggregator<String, Long, Long> remover() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate - value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate - value;
         }
     }
 
@@ -120,14 +130,6 @@ public class SmokeTestUtil {
 
     static Serde<Double> doubleSerde = Serdes.Double();
 
-    static File createDir(final File parent, final String child) {
-        final File dir = new File(parent, child);
-
-        dir.mkdir();
-
-        return dir;
-    }
-
     public static void sleep(final long duration) {
         try {
             Thread.sleep(duration);
diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
new file mode 100644
index 0000000..07c7d5d
--- /dev/null
+++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {
+
+    /**
+     *  args ::= kafka propFileName command disableAutoTerminate
+     *  command := "run" | "process"
+     *
+     * @param args
+     */
+    public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
+        final boolean disableAutoTerminate = args.length > 2;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
+        if ("process".equals(command)) {
+            if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+                !StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)) {
+
+                System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
+                    StreamsConfig.EXACTLY_ONCE);
+
+                System.exit(1);
+            }
+        }
+
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
+        System.out.println("command=" + command);
+        System.out.println("props=" + streamsProperties);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
+
+        switch (command) {
+            case "run":
+                // this starts the driver (data generation and result verification)
+                final int numKeys = 10;
+                final int maxRecordsPerKey = 500;
+                if (disableAutoTerminate) {
+                    generatePerpetually(kafka, numKeys, maxRecordsPerKey);
+                } else {
+                    // slow down data production to span 30 seconds so that system tests have time to
+                    // do their bounces, etc.
+                    final Map<String, Set<Integer>> allData =
+                        generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30));
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
+                break;
+            case "process":
+                // this starts the stream processing app
+                new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
similarity index 79%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
copy to streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index db243fd..ced1369 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,11 +16,10 @@
  */
 package org.apache.kafka.streams.tests;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -38,11 +37,15 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
@@ -50,14 +53,42 @@ public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
     private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
+
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
+    }
+
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
 
     public SmokeTestClient(final String name) {
-        super();
         this.name = name;
     }
 
@@ -70,17 +101,43 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                started = true;
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
@@ -88,17 +145,14 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean closed = streams.close(Duration.ofMinutes(1));
+
+        if (closed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (closed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
@@ -106,39 +160,11 @@ public class SmokeTestClient extends SmokeTestUtil {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props) {
-        final Topology build = getTopology();
-        final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
-        streamsClient.setStateListener((newState, oldState) -> {
-            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
-            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
-                started = true;
-            }
-
-            if (newState == KafkaStreams.State.NOT_RUNNING) {
-                closed = true;
-            }
-        });
-        streamsClient.setUncaughtExceptionHandler((t, e) -> {
-            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-            streamsClient.close(Duration.ofSeconds(30));
-        });
-
-        return streamsClient;
-    }
-
     public Topology getTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
diff --git a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
new file mode 100644
index 0000000..d0a7d22
--- /dev/null
+++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+    private static final String[] TOPICS = {
+        "data",
+        "echo",
+        "max",
+        "min", "min-suppressed", "min-raw",
+        "dif",
+        "sum",
+        "sws-raw", "sws-suppressed",
+        "cnt",
+        "avg",
+        "tagg"
+    };
+
+    private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+
+    private static class ValueList {
+        public final String key;
+        private final int[] values;
+        private int index;
+
+        ValueList(final int min, final int max) {
+            key = min + "-" + max;
+
+            values = new int[max - min + 1];
+            for (int i = 0; i < values.length; i++) {
+                values[i] = min + i;
+            }
+            // We want to randomize the order of data to test not completely predictable processing order
+            // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
+            // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
+            shuffle(values, 10);
+
+            index = 0;
+        }
+
+        int next() {
+            return (index < values.length) ? values[index++] : -1;
+        }
+    }
+
+    public static String[] topics() {
+        return Arrays.copyOf(TOPICS, TOPICS.length);
+    }
+
+    static void generatePerpetually(final String kafka,
+                                    final int numKeys,
+                                    final int maxRecordsPerKey) {
+        final Properties producerProps = generatorProperties(kafka);
+
+        int numRecordsProduced = 0;
+
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+        }
+
+        final Random rand = new Random();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (true) {
+                final int index = rand.nextInt(numKeys);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(
+                        "data",
+                        stringSerde.serializer().serialize("", key),
+                        intSerde.serializer().serialize("", value)
+                    );
+
+                producer.send(record);
+
+                numRecordsProduced++;
+                if (numRecordsProduced % 100 == 0) {
+                    System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                }
+                Utils.sleep(2);
+            }
+        }
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final Duration timeToSpend) {
+        final Properties producerProps = generatorProperties(kafka);
+
+
+        int numRecordsProduced = 0;
+
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+            allData.put(data[i].key, new HashSet<>());
+        }
+        final Random rand = new Random();
+
+        int remaining = data.length;
+
+        final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
+
+        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (remaining > 0) {
+                final int index = rand.nextInt(remaining);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                if (value < 0) {
+                    remaining--;
+                    data[index] = data[remaining];
+                } else {
+
+                    final ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>(
+                            "data",
+                            stringSerde.serializer().serialize("", key),
+                            intSerde.serializer().serialize("", value)
+                        );
+
+                    producer.send(record, new TestCallback(record, needRetry));
+
+                    numRecordsProduced++;
+                    allData.get(key).add(value);
+                    if (numRecordsProduced % 100 == 0) {
+                        System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                    }
+                    Utils.sleep(Math.max(recordPauseTime, 2));
+                }
+            }
+            producer.flush();
+
+            int remainingRetries = 5;
+            while (!needRetry.isEmpty()) {
+                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
+                    producer.send(record, new TestCallback(record, needRetry2));
+                }
+                producer.flush();
+                needRetry = needRetry2;
+
+                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                    System.err.println("Failed to produce all records after multiple retries");
+                    Exit.exit(1);
+                }
+            }
+
+            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+            // all suppressed records.
+            final List<PartitionInfo> partitions = producer.partitionsFor("data");
+            for (final PartitionInfo partition : partitions) {
+                producer.send(new ProducerRecord<>(
+                    partition.topic(),
+                    partition.partition(),
+                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                    stringSerde.serializer().serialize("", "flush"),
+                    intSerde.serializer().serialize("", 0)
+                ));
+            }
+        }
+        return Collections.unmodifiableMap(allData);
+    }
+
+    private static Properties generatorProperties(final String kafka) {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        return producerProps;
+    }
+
+    private static class TestCallback implements Callback {
+        private final ProducerRecord<byte[], byte[]> originalRecord;
+        private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+        TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+                     final List<ProducerRecord<byte[], byte[]>> needRetry) {
+            this.originalRecord = originalRecord;
+            this.needRetry = needRetry;
+        }
+
+        @Override
+        public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+            if (exception != null) {
+                if (exception instanceof TimeoutException) {
+                    needRetry.add(originalRecord);
+                } else {
+                    exception.printStackTrace();
+                    Exit.exit(1);
+                }
+            }
+        }
+    }
+
+    private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
+        final Random rand = new Random();
+        for (int i = 0; i < data.length; i++) {
+            // we shuffle data within windowSize
+            final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
+
+            // swap
+            final int tmp = data[i];
+            data[i] = data[j];
+            data[j] = tmp;
+        }
+    }
+
+    public static class NumberDeserializer implements Deserializer<Number> {
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+        }
+
+        @Override
+        public Number deserialize(final String topic, final byte[] data) {
+            final Number value;
+            switch (topic) {
+                case "data":
+                case "echo":
+                case "min":
+                case "min-raw":
+                case "min-suppressed":
+                case "sws-raw":
+                case "sws-suppressed":
+                case "max":
+                case "dif":
+                    value = intSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "sum":
+                case "cnt":
+                case "tagg":
+                    value = longSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "avg":
+                    value = doubleSerde.deserializer().deserialize(topic, data);
+                    break;
+                default:
+                    throw new RuntimeException("unknown topic: " + topic);
+            }
+            return value;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+    public static VerificationResult verify(final String kafka,
+                                            final Map<String, Set<Integer>> inputs,
+                                            final int maxRecordsPerKey) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+        final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+
+        final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+        int recordsProcessed = 0;
+        final Map<String, AtomicInteger> processed =
+            Stream.of(TOPICS)
+                  .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
+
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
+
+        VerificationResult verificationResult = new VerificationResult(false, "no results yet");
+        int retry = 0;
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
+            final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
+            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
+                verificationResult = verifyAll(inputs, events, false);
+                if (verificationResult.passed()) {
+                    break;
+                } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                    System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
+                    break;
+                } else {
+                    System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
+                }
+            } else {
+                System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
+
+                retry = 0;
+                for (final ConsumerRecord<String, Number> record : records) {
+                    final String key = record.key();
+
+                    final String topic = record.topic();
+                    processed.get(topic).incrementAndGet();
+
+                    if (topic.equals("echo")) {
+                        recordsProcessed++;
+                        if (recordsProcessed % 100 == 0) {
+                            System.out.println("Echo records processed = " + recordsProcessed);
+                        }
+                    }
+
+                    events.computeIfAbsent(topic, t -> new HashMap<>())
+                          .computeIfAbsent(key, k -> new LinkedList<>())
+                          .add(record);
+                }
+
+                System.out.println(processed);
+            }
+        }
+        consumer.close();
+        final long finished = System.currentTimeMillis() - start;
+        System.out.println("Verification time=" + finished);
+        System.out.println("-------------------");
+        System.out.println("Result Verification");
+        System.out.println("-------------------");
+        System.out.println("recordGenerated=" + recordsGenerated);
+        System.out.println("recordProcessed=" + recordsProcessed);
+
+        if (recordsProcessed > recordsGenerated) {
+            System.out.println("PROCESSED-MORE-THAN-GENERATED");
+        } else if (recordsProcessed < recordsGenerated) {
+            System.out.println("PROCESSED-LESS-THAN-GENERATED");
+        }
+
+        boolean success;
+
+        final Map<String, Set<Number>> received =
+            events.get("echo")
+                  .entrySet()
+                  .stream()
+                  .map(entry -> mkEntry(
+                      entry.getKey(),
+                      entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
+                  )
+                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        success = inputs.equals(received);
+
+        if (success) {
+            System.out.println("ALL-RECORDS-DELIVERED");
+        } else {
+            int missedCount = 0;
+            for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
+                missedCount += received.get(entry.getKey()).size();
+            }
+            System.out.println("missedRecords=" + missedCount);
+        }
+
+        // give it one more try if it's not already passing.
+        if (!verificationResult.passed()) {
+            verificationResult = verifyAll(inputs, events, true);
+        }
+        success &= verificationResult.passed();
+
+        System.out.println(verificationResult.result());
+
+        System.out.println(success ? "SUCCESS" : "FAILURE");
+        return verificationResult;
+    }
+
+    public static class VerificationResult {
+        private final boolean passed;
+        private final String result;
+
+        VerificationResult(final boolean passed, final String result) {
+            this.passed = passed;
+            this.result = result;
+        }
+
+        public boolean passed() {
+            return passed;
+        }
+
+        public String result() {
+            return result;
+        }
+    }
+
+    private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
+                                                final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                                final boolean printResults) {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        boolean pass;
+        try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
+            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
+            pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
+            pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
+                final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
+                return getMin(unwindowedKey);
+            }, printResults);
+            pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
+            pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
+            pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
+            pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults);
+            pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
+            pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
+            pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults);
+        }
+        return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
+    }
+
+    private static boolean verify(final PrintStream resultStream,
+                                  final String topic,
+                                  final Map<String, Set<Integer>> inputData,
+                                  final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                  final Function<String, Number> keyToExpectation,
+                                  final boolean printResults) {
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
+        if (outputEvents.isEmpty()) {
+            resultStream.println(topic + " is empty");
+            return false;
+        } else {
+            resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
+
+            if (outputEvents.size() != inputData.size()) {
+                resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
+                                    outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
+                return false;
+            }
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
+                final String key = entry.getKey();
+                final Number expected = keyToExpectation.apply(key);
+                final Number actual = entry.getValue().getLast().value();
+                if (!expected.equals(actual)) {
+                    resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
+
+                    if (printResults) {
+                        resultStream.printf("\t inputEvents=%n%s%n\t" +
+                                "echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
+                            indent("\t\t", observedInputEvents.get(key)),
+                            indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>())));
+
+                        if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic))
+                            resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue()));
+                    }
+
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+
+    private static boolean verifySuppressed(final PrintStream resultStream,
+                                            @SuppressWarnings("SameParameterValue") final String topic,
+                                            final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                            final boolean printResults) {
+        resultStream.println("verifying suppressed " + topic);
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap());
+        for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) {
+            if (entry.getValue().size() != 1) {
+                final String unsuppressedTopic = topic.replace("-suppressed", "-raw");
+                final String key = entry.getKey();
+                final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
+                resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
+                                    key,
+                                    indent("\t\t", entry.getValue()));
+
+                if (printResults)
+                    resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
+                        indent("\t\t", events.get(unsuppressedTopic).get(key)),
+                        indent("\t\t", events.get("data").get(unwindowedKey)));
+
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
+                                 final Iterable<ConsumerRecord<String, Number>> list) {
+        final StringBuilder stringBuilder = new StringBuilder();
+        for (final ConsumerRecord<String, Number> record : list) {
+            stringBuilder.append(prefix).append(record).append('\n');
+        }
+        return stringBuilder.toString();
+    }
+
+    private static Long getSum(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) * (max - min + 1L) / 2L;
+    }
+
+    private static Double getAvg(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) / 2.0;
+    }
+
+
+    private static boolean verifyTAgg(final PrintStream resultStream,
+                                      final Map<String, Set<Integer>> allData,
+                                      final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+                                      final boolean printResults) {
+        if (taggEvents == null) {
+            resultStream.println("tagg is missing");
+            return false;
+        } else if (taggEvents.isEmpty()) {
+            resultStream.println("tagg is empty");
+            return false;
+        } else {
+            resultStream.println("verifying tagg");
+
+            // generate expected answer
+            final Map<String, Long> expected = new HashMap<>();
+            for (final String key : allData.keySet()) {
+                final int min = getMin(key).intValue();
+                final int max = getMax(key).intValue();
+                final String cnt = Long.toString(max - min + 1L);
+
+                expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
+            }
+
+            // check the result
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) {
+                final String key = entry.getKey();
+                Long expectedCount = expected.remove(key);
+                if (expectedCount == null) {
+                    expectedCount = 0L;
+                }
+
+                if (entry.getValue().getLast().value().longValue() != expectedCount) {
+                    resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
+
+                    if (printResults)
+                        resultStream.println("\t taggEvents: " + entry.getValue());
+                    return false;
+                }
+            }
+
+        }
+        return true;
+    }
+
+    private static Number getMin(final String key) {
+        return Integer.parseInt(key.split("-")[0]);
+    }
+
+    private static Number getMax(final String key) {
+        return Integer.parseInt(key.split("-")[1]);
+    }
+
+    private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
+        final List<TopicPartition> partitions = new ArrayList<>();
+
+        for (final String topic : topics) {
+            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
similarity index 65%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
copy to streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 90e6ccd..2dd2577 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -27,12 +27,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.time.Instant;
 
 public class SmokeTestUtil {
-
+    private static final Logger LOG = LoggerFactory.getLogger(SmokeTestUtil.class);
     final static int END = Integer.MAX_VALUE;
 
     static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
@@ -45,21 +46,50 @@ public class SmokeTestUtil {
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
+                    private long smallestOffset = Long.MAX_VALUE;
+                    private long largestOffset = Long.MIN_VALUE;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         super.init(context);
+                        LOG.info("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        System.out.flush();
                         numRecordsProcessed = 0;
+                        smallestOffset = Long.MAX_VALUE;
+                        largestOffset = Long.MIN_VALUE;
                     }
 
                     @Override
                     public void process(final Object key, final Object value) {
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
+                            LOG.info("processed " + numRecordsProcessed + " records from topic=" + topic);
                             System.out.printf("%s: %s%n", name, Instant.now());
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                         }
+
+                        if (smallestOffset > context().offset()) {
+                            smallestOffset = context().offset();
+                        }
+                        if (largestOffset < context().offset()) {
+                            largestOffset = context().offset();
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        LOG.info("Close processor for task {}", context().taskId());
+                        System.out.printf("Close processor for task %s%n", context().taskId());
+                        System.out.println("processed " + numRecordsProcessed + " records");
+                        final long processed;
+                        if (largestOffset >= smallestOffset) {
+                            processed = 1L + largestOffset - smallestOffset;
+                        } else {
+                            processed = 0L;
+                        }
+                        System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
+                        System.out.flush();
                     }
                 };
             }
@@ -76,39 +106,19 @@ public class SmokeTestUtil {
     public static class Agg {
 
         KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
-            return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
-                @Override
-                public KeyValue<String, Long> apply(final String key, final Long value) {
-                    return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
-                }
-            };
+            return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L);
         }
 
         public Initializer<Long> init() {
-            return new Initializer<Long>() {
-                @Override
-                public Long apply() {
-                    return 0L;
-                }
-            };
+            return () -> 0L;
         }
 
         Aggregator<String, Long, Long> adder() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate + value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate + value;
         }
 
         Aggregator<String, Long, Long> remover() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate - value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate - value;
         }
     }
 
@@ -120,14 +130,6 @@ public class SmokeTestUtil {
 
     static Serde<Double> doubleSerde = Serdes.Double();
 
-    static File createDir(final File parent, final String child) {
-        final File dir = new File(parent, child);
-
-        dir.mkdir();
-
-        return dir;
-    }
-
     public static void sleep(final long duration) {
         try {
             Thread.sleep(duration);
diff --git a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
new file mode 100644
index 0000000..07c7d5d
--- /dev/null
+++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {
+
+    /**
+     *  args ::= kafka propFileName command disableAutoTerminate
+     *  command := "run" | "process"
+     *
+     * @param args
+     */
+    public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
+        final boolean disableAutoTerminate = args.length > 2;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
+        if ("process".equals(command)) {
+            if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+                !StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)) {
+
+                System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
+                    StreamsConfig.EXACTLY_ONCE);
+
+                System.exit(1);
+            }
+        }
+
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
+        System.out.println("command=" + command);
+        System.out.println("props=" + streamsProperties);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
+
+        switch (command) {
+            case "run":
+                // this starts the driver (data generation and result verification)
+                final int numKeys = 10;
+                final int maxRecordsPerKey = 500;
+                if (disableAutoTerminate) {
+                    generatePerpetually(kafka, numKeys, maxRecordsPerKey);
+                } else {
+                    // slow down data production to span 30 seconds so that system tests have time to
+                    // do their bounces, etc.
+                    final Map<String, Set<Integer>> allData =
+                        generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30));
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
+                break;
+            case "process":
+                // this starts the stream processing app
+                new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
similarity index 79%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
copy to streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index db243fd..ced1369 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,11 +16,10 @@
  */
 package org.apache.kafka.streams.tests;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -38,11 +37,15 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
@@ -50,14 +53,42 @@ public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
     private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
+
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
+    }
+
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
 
     public SmokeTestClient(final String name) {
-        super();
         this.name = name;
     }
 
@@ -70,17 +101,43 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                started = true;
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
@@ -88,17 +145,14 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean closed = streams.close(Duration.ofMinutes(1));
+
+        if (closed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (closed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
@@ -106,39 +160,11 @@ public class SmokeTestClient extends SmokeTestUtil {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props) {
-        final Topology build = getTopology();
-        final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
-        streamsClient.setStateListener((newState, oldState) -> {
-            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
-            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
-                started = true;
-            }
-
-            if (newState == KafkaStreams.State.NOT_RUNNING) {
-                closed = true;
-            }
-        });
-        streamsClient.setUncaughtExceptionHandler((t, e) -> {
-            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-            streamsClient.close(Duration.ofSeconds(30));
-        });
-
-        return streamsClient;
-    }
-
     public Topology getTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
diff --git a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
new file mode 100644
index 0000000..ac83cd9
--- /dev/null
+++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+    private static final String[] TOPICS = {
+        "data",
+        "echo",
+        "max",
+        "min", "min-suppressed", "min-raw",
+        "dif",
+        "sum",
+        "sws-raw", "sws-suppressed",
+        "cnt",
+        "avg",
+        "tagg"
+    };
+
+    private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+
+    private static class ValueList {
+        public final String key;
+        private final int[] values;
+        private int index;
+
+        ValueList(final int min, final int max) {
+            key = min + "-" + max;
+
+            values = new int[max - min + 1];
+            for (int i = 0; i < values.length; i++) {
+                values[i] = min + i;
+            }
+            // We want to randomize the order of data to test not completely predictable processing order
+            // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
+            // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
+            shuffle(values, 10);
+
+            index = 0;
+        }
+
+        int next() {
+            return (index < values.length) ? values[index++] : -1;
+        }
+    }
+
+    public static String[] topics() {
+        return Arrays.copyOf(TOPICS, TOPICS.length);
+    }
+
+    static void generatePerpetually(final String kafka,
+                                    final int numKeys,
+                                    final int maxRecordsPerKey) {
+        final Properties producerProps = generatorProperties(kafka);
+
+        int numRecordsProduced = 0;
+
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+        }
+
+        final Random rand = new Random();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (true) {
+                final int index = rand.nextInt(numKeys);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(
+                        "data",
+                        stringSerde.serializer().serialize("", key),
+                        intSerde.serializer().serialize("", value)
+                    );
+
+                producer.send(record);
+
+                numRecordsProduced++;
+                if (numRecordsProduced % 100 == 0) {
+                    System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                }
+                Utils.sleep(2);
+            }
+        }
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final Duration timeToSpend) {
+        final Properties producerProps = generatorProperties(kafka);
+
+
+        int numRecordsProduced = 0;
+
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+            allData.put(data[i].key, new HashSet<>());
+        }
+        final Random rand = new Random();
+
+        int remaining = data.length;
+
+        final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
+
+        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (remaining > 0) {
+                final int index = rand.nextInt(remaining);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                if (value < 0) {
+                    remaining--;
+                    data[index] = data[remaining];
+                } else {
+
+                    final ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>(
+                            "data",
+                            stringSerde.serializer().serialize("", key),
+                            intSerde.serializer().serialize("", value)
+                        );
+
+                    producer.send(record, new TestCallback(record, needRetry));
+
+                    numRecordsProduced++;
+                    allData.get(key).add(value);
+                    if (numRecordsProduced % 100 == 0) {
+                        System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                    }
+                    Utils.sleep(Math.max(recordPauseTime, 2));
+                }
+            }
+            producer.flush();
+
+            int remainingRetries = 5;
+            while (!needRetry.isEmpty()) {
+                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
+                    producer.send(record, new TestCallback(record, needRetry2));
+                }
+                producer.flush();
+                needRetry = needRetry2;
+
+                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                    System.err.println("Failed to produce all records after multiple retries");
+                    Exit.exit(1);
+                }
+            }
+
+            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+            // all suppressed records.
+            final List<PartitionInfo> partitions = producer.partitionsFor("data");
+            for (final PartitionInfo partition : partitions) {
+                producer.send(new ProducerRecord<>(
+                    partition.topic(),
+                    partition.partition(),
+                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                    stringSerde.serializer().serialize("", "flush"),
+                    intSerde.serializer().serialize("", 0)
+                ));
+            }
+        }
+        return Collections.unmodifiableMap(allData);
+    }
+
+    private static Properties generatorProperties(final String kafka) {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        return producerProps;
+    }
+
+    private static class TestCallback implements Callback {
+        private final ProducerRecord<byte[], byte[]> originalRecord;
+        private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+        TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+                     final List<ProducerRecord<byte[], byte[]>> needRetry) {
+            this.originalRecord = originalRecord;
+            this.needRetry = needRetry;
+        }
+
+        @Override
+        public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+            if (exception != null) {
+                if (exception instanceof TimeoutException) {
+                    needRetry.add(originalRecord);
+                } else {
+                    exception.printStackTrace();
+                    Exit.exit(1);
+                }
+            }
+        }
+    }
+
+    private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
+        final Random rand = new Random();
+        for (int i = 0; i < data.length; i++) {
+            // we shuffle data within windowSize
+            final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
+
+            // swap
+            final int tmp = data[i];
+            data[i] = data[j];
+            data[j] = tmp;
+        }
+    }
+
+    public static class NumberDeserializer implements Deserializer<Number> {
+        @Override
+        public Number deserialize(final String topic, final byte[] data) {
+            final Number value;
+            switch (topic) {
+                case "data":
+                case "echo":
+                case "min":
+                case "min-raw":
+                case "min-suppressed":
+                case "sws-raw":
+                case "sws-suppressed":
+                case "max":
+                case "dif":
+                    value = intSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "sum":
+                case "cnt":
+                case "tagg":
+                    value = longSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "avg":
+                    value = doubleSerde.deserializer().deserialize(topic, data);
+                    break;
+                default:
+                    throw new RuntimeException("unknown topic: " + topic);
+            }
+            return value;
+        }
+    }
+
+    public static VerificationResult verify(final String kafka,
+                                            final Map<String, Set<Integer>> inputs,
+                                            final int maxRecordsPerKey) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+        final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+
+        final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+        int recordsProcessed = 0;
+        final Map<String, AtomicInteger> processed =
+            Stream.of(TOPICS)
+                  .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
+
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
+
+        VerificationResult verificationResult = new VerificationResult(false, "no results yet");
+        int retry = 0;
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
+            final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
+            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
+                verificationResult = verifyAll(inputs, events, false);
+                if (verificationResult.passed()) {
+                    break;
+                } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                    System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
+                    break;
+                } else {
+                    System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
+                }
+            } else {
+                System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
+
+                retry = 0;
+                for (final ConsumerRecord<String, Number> record : records) {
+                    final String key = record.key();
+
+                    final String topic = record.topic();
+                    processed.get(topic).incrementAndGet();
+
+                    if (topic.equals("echo")) {
+                        recordsProcessed++;
+                        if (recordsProcessed % 100 == 0) {
+                            System.out.println("Echo records processed = " + recordsProcessed);
+                        }
+                    }
+
+                    events.computeIfAbsent(topic, t -> new HashMap<>())
+                          .computeIfAbsent(key, k -> new LinkedList<>())
+                          .add(record);
+                }
+
+                System.out.println(processed);
+            }
+        }
+        consumer.close();
+        final long finished = System.currentTimeMillis() - start;
+        System.out.println("Verification time=" + finished);
+        System.out.println("-------------------");
+        System.out.println("Result Verification");
+        System.out.println("-------------------");
+        System.out.println("recordGenerated=" + recordsGenerated);
+        System.out.println("recordProcessed=" + recordsProcessed);
+
+        if (recordsProcessed > recordsGenerated) {
+            System.out.println("PROCESSED-MORE-THAN-GENERATED");
+        } else if (recordsProcessed < recordsGenerated) {
+            System.out.println("PROCESSED-LESS-THAN-GENERATED");
+        }
+
+        boolean success;
+
+        final Map<String, Set<Number>> received =
+            events.get("echo")
+                  .entrySet()
+                  .stream()
+                  .map(entry -> mkEntry(
+                      entry.getKey(),
+                      entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
+                  )
+                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        success = inputs.equals(received);
+
+        if (success) {
+            System.out.println("ALL-RECORDS-DELIVERED");
+        } else {
+            int missedCount = 0;
+            for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
+                missedCount += received.get(entry.getKey()).size();
+            }
+            System.out.println("missedRecords=" + missedCount);
+        }
+
+        // give it one more try if it's not already passing.
+        if (!verificationResult.passed()) {
+            verificationResult = verifyAll(inputs, events, true);
+        }
+        success &= verificationResult.passed();
+
+        System.out.println(verificationResult.result());
+
+        System.out.println(success ? "SUCCESS" : "FAILURE");
+        return verificationResult;
+    }
+
+    public static class VerificationResult {
+        private final boolean passed;
+        private final String result;
+
+        VerificationResult(final boolean passed, final String result) {
+            this.passed = passed;
+            this.result = result;
+        }
+
+        public boolean passed() {
+            return passed;
+        }
+
+        public String result() {
+            return result;
+        }
+    }
+
+    private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
+                                                final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                                final boolean printResults) {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        boolean pass;
+        try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
+            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
+            pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
+            pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
+                final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
+                return getMin(unwindowedKey);
+            }, printResults);
+            pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
+            pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
+            pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
+            pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults);
+            pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
+            pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
+            pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults);
+        }
+        return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
+    }
+
+    private static boolean verify(final PrintStream resultStream,
+                                  final String topic,
+                                  final Map<String, Set<Integer>> inputData,
+                                  final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                  final Function<String, Number> keyToExpectation,
+                                  final boolean printResults) {
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
+        if (outputEvents.isEmpty()) {
+            resultStream.println(topic + " is empty");
+            return false;
+        } else {
+            resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
+
+            if (outputEvents.size() != inputData.size()) {
+                resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
+                                    outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
+                return false;
+            }
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
+                final String key = entry.getKey();
+                final Number expected = keyToExpectation.apply(key);
+                final Number actual = entry.getValue().getLast().value();
+                if (!expected.equals(actual)) {
+                    resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
+
+                    if (printResults) {
+                        resultStream.printf("\t inputEvents=%n%s%n\t" +
+                                "echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
+                            indent("\t\t", observedInputEvents.get(key)),
+                            indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>())));
+
+                        if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic))
+                            resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue()));
+                    }
+
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+
+    private static boolean verifySuppressed(final PrintStream resultStream,
+                                            @SuppressWarnings("SameParameterValue") final String topic,
+                                            final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                            final boolean printResults) {
+        resultStream.println("verifying suppressed " + topic);
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap());
+        for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) {
+            if (entry.getValue().size() != 1) {
+                final String unsuppressedTopic = topic.replace("-suppressed", "-raw");
+                final String key = entry.getKey();
+                final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
+                resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
+                                    key,
+                                    indent("\t\t", entry.getValue()));
+
+                if (printResults)
+                    resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
+                        indent("\t\t", events.get(unsuppressedTopic).get(key)),
+                        indent("\t\t", events.get("data").get(unwindowedKey)));
+
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
+                                 final Iterable<ConsumerRecord<String, Number>> list) {
+        final StringBuilder stringBuilder = new StringBuilder();
+        for (final ConsumerRecord<String, Number> record : list) {
+            stringBuilder.append(prefix).append(record).append('\n');
+        }
+        return stringBuilder.toString();
+    }
+
+    private static Long getSum(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) * (max - min + 1L) / 2L;
+    }
+
+    private static Double getAvg(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) / 2.0;
+    }
+
+
+    private static boolean verifyTAgg(final PrintStream resultStream,
+                                      final Map<String, Set<Integer>> allData,
+                                      final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+                                      final boolean printResults) {
+        if (taggEvents == null) {
+            resultStream.println("tagg is missing");
+            return false;
+        } else if (taggEvents.isEmpty()) {
+            resultStream.println("tagg is empty");
+            return false;
+        } else {
+            resultStream.println("verifying tagg");
+
+            // generate expected answer
+            final Map<String, Long> expected = new HashMap<>();
+            for (final String key : allData.keySet()) {
+                final int min = getMin(key).intValue();
+                final int max = getMax(key).intValue();
+                final String cnt = Long.toString(max - min + 1L);
+
+                expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
+            }
+
+            // check the result
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) {
+                final String key = entry.getKey();
+                Long expectedCount = expected.remove(key);
+                if (expectedCount == null) {
+                    expectedCount = 0L;
+                }
+
+                if (entry.getValue().getLast().value().longValue() != expectedCount) {
+                    resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
+
+                    if (printResults)
+                        resultStream.println("\t taggEvents: " + entry.getValue());
+                    return false;
+                }
+            }
+
+        }
+        return true;
+    }
+
+    private static Number getMin(final String key) {
+        return Integer.parseInt(key.split("-")[0]);
+    }
+
+    private static Number getMax(final String key) {
+        return Integer.parseInt(key.split("-")[1]);
+    }
+
+    private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
+        final List<TopicPartition> partitions = new ArrayList<>();
+
+        for (final String topic : topics) {
+            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
similarity index 70%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
copy to streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 90e6ccd..e8ec04c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-import java.io.File;
 import java.time.Instant;
 
 public class SmokeTestUtil {
@@ -45,12 +44,17 @@ public class SmokeTestUtil {
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
+                    private long smallestOffset = Long.MAX_VALUE;
+                    private long largestOffset = Long.MIN_VALUE;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         super.init(context);
                         System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        System.out.flush();
                         numRecordsProcessed = 0;
+                        smallestOffset = Long.MAX_VALUE;
+                        largestOffset = Long.MIN_VALUE;
                     }
 
                     @Override
@@ -60,6 +64,27 @@ public class SmokeTestUtil {
                             System.out.printf("%s: %s%n", name, Instant.now());
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                         }
+
+                        if (smallestOffset > context().offset()) {
+                            smallestOffset = context().offset();
+                        }
+                        if (largestOffset < context().offset()) {
+                            largestOffset = context().offset();
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        System.out.printf("Close processor for task %s%n", context().taskId());
+                        System.out.println("processed " + numRecordsProcessed + " records");
+                        final long processed;
+                        if (largestOffset >= smallestOffset) {
+                            processed = 1L + largestOffset - smallestOffset;
+                        } else {
+                            processed = 0L;
+                        }
+                        System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
+                        System.out.flush();
                     }
                 };
             }
@@ -76,39 +101,19 @@ public class SmokeTestUtil {
     public static class Agg {
 
         KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
-            return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
-                @Override
-                public KeyValue<String, Long> apply(final String key, final Long value) {
-                    return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
-                }
-            };
+            return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L);
         }
 
         public Initializer<Long> init() {
-            return new Initializer<Long>() {
-                @Override
-                public Long apply() {
-                    return 0L;
-                }
-            };
+            return () -> 0L;
         }
 
         Aggregator<String, Long, Long> adder() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate + value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate + value;
         }
 
         Aggregator<String, Long, Long> remover() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate - value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate - value;
         }
     }
 
@@ -120,14 +125,6 @@ public class SmokeTestUtil {
 
     static Serde<Double> doubleSerde = Serdes.Double();
 
-    static File createDir(final File parent, final String child) {
-        final File dir = new File(parent, child);
-
-        dir.mkdir();
-
-        return dir;
-    }
-
     public static void sleep(final long duration) {
         try {
             Thread.sleep(duration);
diff --git a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
new file mode 100644
index 0000000..07c7d5d
--- /dev/null
+++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {
+
+    /**
+     *  args ::= kafka propFileName command disableAutoTerminate
+     *  command := "run" | "process"
+     *
+     * @param args
+     */
+    public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
+        final boolean disableAutoTerminate = args.length > 2;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
+        if ("process".equals(command)) {
+            if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+                !StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)) {
+
+                System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
+                    StreamsConfig.EXACTLY_ONCE);
+
+                System.exit(1);
+            }
+        }
+
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
+        System.out.println("command=" + command);
+        System.out.println("props=" + streamsProperties);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
+
+        switch (command) {
+            case "run":
+                // this starts the driver (data generation and result verification)
+                final int numKeys = 10;
+                final int maxRecordsPerKey = 500;
+                if (disableAutoTerminate) {
+                    generatePerpetually(kafka, numKeys, maxRecordsPerKey);
+                } else {
+                    // slow down data production to span 30 seconds so that system tests have time to
+                    // do their bounces, etc.
+                    final Map<String, Set<Integer>> allData =
+                        generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30));
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
+                break;
+            case "process":
+                // this starts the stream processing app
+                new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
similarity index 79%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
copy to streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index db243fd..ced1369 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,11 +16,10 @@
  */
 package org.apache.kafka.streams.tests;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -38,11 +37,15 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
@@ -50,14 +53,42 @@ public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
     private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
+
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
+    }
+
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
 
     public SmokeTestClient(final String name) {
-        super();
         this.name = name;
     }
 
@@ -70,17 +101,43 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                started = true;
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
@@ -88,17 +145,14 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean closed = streams.close(Duration.ofMinutes(1));
+
+        if (closed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (closed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
@@ -106,39 +160,11 @@ public class SmokeTestClient extends SmokeTestUtil {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props) {
-        final Topology build = getTopology();
-        final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
-        streamsClient.setStateListener((newState, oldState) -> {
-            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
-            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
-                started = true;
-            }
-
-            if (newState == KafkaStreams.State.NOT_RUNNING) {
-                closed = true;
-            }
-        });
-        streamsClient.setUncaughtExceptionHandler((t, e) -> {
-            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-            streamsClient.close(Duration.ofSeconds(30));
-        });
-
-        return streamsClient;
-    }
-
     public Topology getTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
new file mode 100644
index 0000000..ac83cd9
--- /dev/null
+++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+    private static final String[] TOPICS = {
+        "data",
+        "echo",
+        "max",
+        "min", "min-suppressed", "min-raw",
+        "dif",
+        "sum",
+        "sws-raw", "sws-suppressed",
+        "cnt",
+        "avg",
+        "tagg"
+    };
+
+    private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+
+    private static class ValueList {
+        public final String key;
+        private final int[] values;
+        private int index;
+
+        ValueList(final int min, final int max) {
+            key = min + "-" + max;
+
+            values = new int[max - min + 1];
+            for (int i = 0; i < values.length; i++) {
+                values[i] = min + i;
+            }
+            // We want to randomize the order of data to test not completely predictable processing order
+            // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
+            // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
+            shuffle(values, 10);
+
+            index = 0;
+        }
+
+        int next() {
+            return (index < values.length) ? values[index++] : -1;
+        }
+    }
+
+    public static String[] topics() {
+        return Arrays.copyOf(TOPICS, TOPICS.length);
+    }
+
+    static void generatePerpetually(final String kafka,
+                                    final int numKeys,
+                                    final int maxRecordsPerKey) {
+        final Properties producerProps = generatorProperties(kafka);
+
+        int numRecordsProduced = 0;
+
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+        }
+
+        final Random rand = new Random();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (true) {
+                final int index = rand.nextInt(numKeys);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(
+                        "data",
+                        stringSerde.serializer().serialize("", key),
+                        intSerde.serializer().serialize("", value)
+                    );
+
+                producer.send(record);
+
+                numRecordsProduced++;
+                if (numRecordsProduced % 100 == 0) {
+                    System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                }
+                Utils.sleep(2);
+            }
+        }
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final Duration timeToSpend) {
+        final Properties producerProps = generatorProperties(kafka);
+
+
+        int numRecordsProduced = 0;
+
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+            allData.put(data[i].key, new HashSet<>());
+        }
+        final Random rand = new Random();
+
+        int remaining = data.length;
+
+        final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
+
+        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (remaining > 0) {
+                final int index = rand.nextInt(remaining);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                if (value < 0) {
+                    remaining--;
+                    data[index] = data[remaining];
+                } else {
+
+                    final ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>(
+                            "data",
+                            stringSerde.serializer().serialize("", key),
+                            intSerde.serializer().serialize("", value)
+                        );
+
+                    producer.send(record, new TestCallback(record, needRetry));
+
+                    numRecordsProduced++;
+                    allData.get(key).add(value);
+                    if (numRecordsProduced % 100 == 0) {
+                        System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                    }
+                    Utils.sleep(Math.max(recordPauseTime, 2));
+                }
+            }
+            producer.flush();
+
+            int remainingRetries = 5;
+            while (!needRetry.isEmpty()) {
+                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
+                    producer.send(record, new TestCallback(record, needRetry2));
+                }
+                producer.flush();
+                needRetry = needRetry2;
+
+                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                    System.err.println("Failed to produce all records after multiple retries");
+                    Exit.exit(1);
+                }
+            }
+
+            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+            // all suppressed records.
+            final List<PartitionInfo> partitions = producer.partitionsFor("data");
+            for (final PartitionInfo partition : partitions) {
+                producer.send(new ProducerRecord<>(
+                    partition.topic(),
+                    partition.partition(),
+                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                    stringSerde.serializer().serialize("", "flush"),
+                    intSerde.serializer().serialize("", 0)
+                ));
+            }
+        }
+        return Collections.unmodifiableMap(allData);
+    }
+
+    private static Properties generatorProperties(final String kafka) {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        return producerProps;
+    }
+
+    private static class TestCallback implements Callback {
+        private final ProducerRecord<byte[], byte[]> originalRecord;
+        private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+        TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+                     final List<ProducerRecord<byte[], byte[]>> needRetry) {
+            this.originalRecord = originalRecord;
+            this.needRetry = needRetry;
+        }
+
+        @Override
+        public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+            if (exception != null) {
+                if (exception instanceof TimeoutException) {
+                    needRetry.add(originalRecord);
+                } else {
+                    exception.printStackTrace();
+                    Exit.exit(1);
+                }
+            }
+        }
+    }
+
+    private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
+        final Random rand = new Random();
+        for (int i = 0; i < data.length; i++) {
+            // we shuffle data within windowSize
+            final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
+
+            // swap
+            final int tmp = data[i];
+            data[i] = data[j];
+            data[j] = tmp;
+        }
+    }
+
+    public static class NumberDeserializer implements Deserializer<Number> {
+        @Override
+        public Number deserialize(final String topic, final byte[] data) {
+            final Number value;
+            switch (topic) {
+                case "data":
+                case "echo":
+                case "min":
+                case "min-raw":
+                case "min-suppressed":
+                case "sws-raw":
+                case "sws-suppressed":
+                case "max":
+                case "dif":
+                    value = intSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "sum":
+                case "cnt":
+                case "tagg":
+                    value = longSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "avg":
+                    value = doubleSerde.deserializer().deserialize(topic, data);
+                    break;
+                default:
+                    throw new RuntimeException("unknown topic: " + topic);
+            }
+            return value;
+        }
+    }
+
+    public static VerificationResult verify(final String kafka,
+                                            final Map<String, Set<Integer>> inputs,
+                                            final int maxRecordsPerKey) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+        final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+
+        final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+        int recordsProcessed = 0;
+        final Map<String, AtomicInteger> processed =
+            Stream.of(TOPICS)
+                  .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
+
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
+
+        VerificationResult verificationResult = new VerificationResult(false, "no results yet");
+        int retry = 0;
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
+            final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
+            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
+                verificationResult = verifyAll(inputs, events, false);
+                if (verificationResult.passed()) {
+                    break;
+                } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                    System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
+                    break;
+                } else {
+                    System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
+                }
+            } else {
+                System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
+
+                retry = 0;
+                for (final ConsumerRecord<String, Number> record : records) {
+                    final String key = record.key();
+
+                    final String topic = record.topic();
+                    processed.get(topic).incrementAndGet();
+
+                    if (topic.equals("echo")) {
+                        recordsProcessed++;
+                        if (recordsProcessed % 100 == 0) {
+                            System.out.println("Echo records processed = " + recordsProcessed);
+                        }
+                    }
+
+                    events.computeIfAbsent(topic, t -> new HashMap<>())
+                          .computeIfAbsent(key, k -> new LinkedList<>())
+                          .add(record);
+                }
+
+                System.out.println(processed);
+            }
+        }
+        consumer.close();
+        final long finished = System.currentTimeMillis() - start;
+        System.out.println("Verification time=" + finished);
+        System.out.println("-------------------");
+        System.out.println("Result Verification");
+        System.out.println("-------------------");
+        System.out.println("recordGenerated=" + recordsGenerated);
+        System.out.println("recordProcessed=" + recordsProcessed);
+
+        if (recordsProcessed > recordsGenerated) {
+            System.out.println("PROCESSED-MORE-THAN-GENERATED");
+        } else if (recordsProcessed < recordsGenerated) {
+            System.out.println("PROCESSED-LESS-THAN-GENERATED");
+        }
+
+        boolean success;
+
+        final Map<String, Set<Number>> received =
+            events.get("echo")
+                  .entrySet()
+                  .stream()
+                  .map(entry -> mkEntry(
+                      entry.getKey(),
+                      entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
+                  )
+                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        success = inputs.equals(received);
+
+        if (success) {
+            System.out.println("ALL-RECORDS-DELIVERED");
+        } else {
+            int missedCount = 0;
+            for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
+                missedCount += received.get(entry.getKey()).size();
+            }
+            System.out.println("missedRecords=" + missedCount);
+        }
+
+        // give it one more try if it's not already passing.
+        if (!verificationResult.passed()) {
+            verificationResult = verifyAll(inputs, events, true);
+        }
+        success &= verificationResult.passed();
+
+        System.out.println(verificationResult.result());
+
+        System.out.println(success ? "SUCCESS" : "FAILURE");
+        return verificationResult;
+    }
+
+    public static class VerificationResult {
+        private final boolean passed;
+        private final String result;
+
+        VerificationResult(final boolean passed, final String result) {
+            this.passed = passed;
+            this.result = result;
+        }
+
+        public boolean passed() {
+            return passed;
+        }
+
+        public String result() {
+            return result;
+        }
+    }
+
+    private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
+                                                final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                                final boolean printResults) {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        boolean pass;
+        try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
+            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
+            pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
+            pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
+                final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
+                return getMin(unwindowedKey);
+            }, printResults);
+            pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
+            pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
+            pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
+            pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults);
+            pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
+            pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
+            pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults);
+        }
+        return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
+    }
+
+    private static boolean verify(final PrintStream resultStream,
+                                  final String topic,
+                                  final Map<String, Set<Integer>> inputData,
+                                  final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                  final Function<String, Number> keyToExpectation,
+                                  final boolean printResults) {
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
+        if (outputEvents.isEmpty()) {
+            resultStream.println(topic + " is empty");
+            return false;
+        } else {
+            resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
+
+            if (outputEvents.size() != inputData.size()) {
+                resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
+                                    outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
+                return false;
+            }
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
+                final String key = entry.getKey();
+                final Number expected = keyToExpectation.apply(key);
+                final Number actual = entry.getValue().getLast().value();
+                if (!expected.equals(actual)) {
+                    resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
+
+                    if (printResults) {
+                        resultStream.printf("\t inputEvents=%n%s%n\t" +
+                                "echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
+                            indent("\t\t", observedInputEvents.get(key)),
+                            indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>())));
+
+                        if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic))
+                            resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue()));
+                    }
+
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+
+    private static boolean verifySuppressed(final PrintStream resultStream,
+                                            @SuppressWarnings("SameParameterValue") final String topic,
+                                            final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                            final boolean printResults) {
+        resultStream.println("verifying suppressed " + topic);
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap());
+        for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) {
+            if (entry.getValue().size() != 1) {
+                final String unsuppressedTopic = topic.replace("-suppressed", "-raw");
+                final String key = entry.getKey();
+                final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
+                resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
+                                    key,
+                                    indent("\t\t", entry.getValue()));
+
+                if (printResults)
+                    resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
+                        indent("\t\t", events.get(unsuppressedTopic).get(key)),
+                        indent("\t\t", events.get("data").get(unwindowedKey)));
+
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
+                                 final Iterable<ConsumerRecord<String, Number>> list) {
+        final StringBuilder stringBuilder = new StringBuilder();
+        for (final ConsumerRecord<String, Number> record : list) {
+            stringBuilder.append(prefix).append(record).append('\n');
+        }
+        return stringBuilder.toString();
+    }
+
+    private static Long getSum(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) * (max - min + 1L) / 2L;
+    }
+
+    private static Double getAvg(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) / 2.0;
+    }
+
+
+    private static boolean verifyTAgg(final PrintStream resultStream,
+                                      final Map<String, Set<Integer>> allData,
+                                      final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+                                      final boolean printResults) {
+        if (taggEvents == null) {
+            resultStream.println("tagg is missing");
+            return false;
+        } else if (taggEvents.isEmpty()) {
+            resultStream.println("tagg is empty");
+            return false;
+        } else {
+            resultStream.println("verifying tagg");
+
+            // generate expected answer
+            final Map<String, Long> expected = new HashMap<>();
+            for (final String key : allData.keySet()) {
+                final int min = getMin(key).intValue();
+                final int max = getMax(key).intValue();
+                final String cnt = Long.toString(max - min + 1L);
+
+                expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
+            }
+
+            // check the result
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) {
+                final String key = entry.getKey();
+                Long expectedCount = expected.remove(key);
+                if (expectedCount == null) {
+                    expectedCount = 0L;
+                }
+
+                if (entry.getValue().getLast().value().longValue() != expectedCount) {
+                    resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
+
+                    if (printResults)
+                        resultStream.println("\t taggEvents: " + entry.getValue());
+                    return false;
+                }
+            }
+
+        }
+        return true;
+    }
+
+    private static Number getMin(final String key) {
+        return Integer.parseInt(key.split("-")[0]);
+    }
+
+    private static Number getMax(final String key) {
+        return Integer.parseInt(key.split("-")[1]);
+    }
+
+    private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
+        final List<TopicPartition> partitions = new ArrayList<>();
+
+        for (final String topic : topics) {
+            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
similarity index 70%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
copy to streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 90e6ccd..e8ec04c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-import java.io.File;
 import java.time.Instant;
 
 public class SmokeTestUtil {
@@ -45,12 +44,17 @@ public class SmokeTestUtil {
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
+                    private long smallestOffset = Long.MAX_VALUE;
+                    private long largestOffset = Long.MIN_VALUE;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         super.init(context);
                         System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        System.out.flush();
                         numRecordsProcessed = 0;
+                        smallestOffset = Long.MAX_VALUE;
+                        largestOffset = Long.MIN_VALUE;
                     }
 
                     @Override
@@ -60,6 +64,27 @@ public class SmokeTestUtil {
                             System.out.printf("%s: %s%n", name, Instant.now());
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                         }
+
+                        if (smallestOffset > context().offset()) {
+                            smallestOffset = context().offset();
+                        }
+                        if (largestOffset < context().offset()) {
+                            largestOffset = context().offset();
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        System.out.printf("Close processor for task %s%n", context().taskId());
+                        System.out.println("processed " + numRecordsProcessed + " records");
+                        final long processed;
+                        if (largestOffset >= smallestOffset) {
+                            processed = 1L + largestOffset - smallestOffset;
+                        } else {
+                            processed = 0L;
+                        }
+                        System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
+                        System.out.flush();
                     }
                 };
             }
@@ -76,39 +101,19 @@ public class SmokeTestUtil {
     public static class Agg {
 
         KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
-            return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
-                @Override
-                public KeyValue<String, Long> apply(final String key, final Long value) {
-                    return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
-                }
-            };
+            return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L);
         }
 
         public Initializer<Long> init() {
-            return new Initializer<Long>() {
-                @Override
-                public Long apply() {
-                    return 0L;
-                }
-            };
+            return () -> 0L;
         }
 
         Aggregator<String, Long, Long> adder() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate + value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate + value;
         }
 
         Aggregator<String, Long, Long> remover() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate - value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate - value;
         }
     }
 
@@ -120,14 +125,6 @@ public class SmokeTestUtil {
 
     static Serde<Double> doubleSerde = Serdes.Double();
 
-    static File createDir(final File parent, final String child) {
-        final File dir = new File(parent, child);
-
-        dir.mkdir();
-
-        return dir;
-    }
-
     public static void sleep(final long duration) {
         try {
             Thread.sleep(duration);
diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
new file mode 100644
index 0000000..07c7d5d
--- /dev/null
+++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {
+
+    /**
+     *  args ::= kafka propFileName command disableAutoTerminate
+     *  command := "run" | "process"
+     *
+     * @param args
+     */
+    public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
+        final boolean disableAutoTerminate = args.length > 2;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
+        if ("process".equals(command)) {
+            if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+                !StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)) {
+
+                System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
+                    StreamsConfig.EXACTLY_ONCE);
+
+                System.exit(1);
+            }
+        }
+
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
+        System.out.println("command=" + command);
+        System.out.println("props=" + streamsProperties);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
+
+        switch (command) {
+            case "run":
+                // this starts the driver (data generation and result verification)
+                final int numKeys = 10;
+                final int maxRecordsPerKey = 500;
+                if (disableAutoTerminate) {
+                    generatePerpetually(kafka, numKeys, maxRecordsPerKey);
+                } else {
+                    // slow down data production to span 30 seconds so that system tests have time to
+                    // do their bounces, etc.
+                    final Map<String, Set<Integer>> allData =
+                        generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30));
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
+                break;
+            case "process":
+                // this starts the stream processing app
+                new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
similarity index 79%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
copy to streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index db243fd..ced1369 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,11 +16,10 @@
  */
 package org.apache.kafka.streams.tests;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -38,11 +37,15 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
@@ -50,14 +53,42 @@ public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
     private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
+
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
+    }
+
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
 
     public SmokeTestClient(final String name) {
-        super();
         this.name = name;
     }
 
@@ -70,17 +101,43 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                started = true;
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
@@ -88,17 +145,14 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean closed = streams.close(Duration.ofMinutes(1));
+
+        if (closed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (closed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
@@ -106,39 +160,11 @@ public class SmokeTestClient extends SmokeTestUtil {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props) {
-        final Topology build = getTopology();
-        final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
-        streamsClient.setStateListener((newState, oldState) -> {
-            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
-            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
-                started = true;
-            }
-
-            if (newState == KafkaStreams.State.NOT_RUNNING) {
-                closed = true;
-            }
-        });
-        streamsClient.setUncaughtExceptionHandler((t, e) -> {
-            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-            streamsClient.close(Duration.ofSeconds(30));
-        });
-
-        return streamsClient;
-    }
-
     public Topology getTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
new file mode 100644
index 0000000..ac83cd9
--- /dev/null
+++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+    private static final String[] TOPICS = {
+        "data",
+        "echo",
+        "max",
+        "min", "min-suppressed", "min-raw",
+        "dif",
+        "sum",
+        "sws-raw", "sws-suppressed",
+        "cnt",
+        "avg",
+        "tagg"
+    };
+
+    private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+
+    private static class ValueList {
+        public final String key;
+        private final int[] values;
+        private int index;
+
+        ValueList(final int min, final int max) {
+            key = min + "-" + max;
+
+            values = new int[max - min + 1];
+            for (int i = 0; i < values.length; i++) {
+                values[i] = min + i;
+            }
+            // We want to randomize the order of data to test not completely predictable processing order
+            // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
+            // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
+            shuffle(values, 10);
+
+            index = 0;
+        }
+
+        int next() {
+            return (index < values.length) ? values[index++] : -1;
+        }
+    }
+
+    public static String[] topics() {
+        return Arrays.copyOf(TOPICS, TOPICS.length);
+    }
+
+    static void generatePerpetually(final String kafka,
+                                    final int numKeys,
+                                    final int maxRecordsPerKey) {
+        final Properties producerProps = generatorProperties(kafka);
+
+        int numRecordsProduced = 0;
+
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+        }
+
+        final Random rand = new Random();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (true) {
+                final int index = rand.nextInt(numKeys);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(
+                        "data",
+                        stringSerde.serializer().serialize("", key),
+                        intSerde.serializer().serialize("", value)
+                    );
+
+                producer.send(record);
+
+                numRecordsProduced++;
+                if (numRecordsProduced % 100 == 0) {
+                    System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                }
+                Utils.sleep(2);
+            }
+        }
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final Duration timeToSpend) {
+        final Properties producerProps = generatorProperties(kafka);
+
+
+        int numRecordsProduced = 0;
+
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+            allData.put(data[i].key, new HashSet<>());
+        }
+        final Random rand = new Random();
+
+        int remaining = data.length;
+
+        final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
+
+        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
+            while (remaining > 0) {
+                final int index = rand.nextInt(remaining);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                if (value < 0) {
+                    remaining--;
+                    data[index] = data[remaining];
+                } else {
+
+                    final ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>(
+                            "data",
+                            stringSerde.serializer().serialize("", key),
+                            intSerde.serializer().serialize("", value)
+                        );
+
+                    producer.send(record, new TestCallback(record, needRetry));
+
+                    numRecordsProduced++;
+                    allData.get(key).add(value);
+                    if (numRecordsProduced % 100 == 0) {
+                        System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
+                    }
+                    Utils.sleep(Math.max(recordPauseTime, 2));
+                }
+            }
+            producer.flush();
+
+            int remainingRetries = 5;
+            while (!needRetry.isEmpty()) {
+                final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+                for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                    System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
+                    producer.send(record, new TestCallback(record, needRetry2));
+                }
+                producer.flush();
+                needRetry = needRetry2;
+
+                if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                    System.err.println("Failed to produce all records after multiple retries");
+                    Exit.exit(1);
+                }
+            }
+
+            // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+            // all suppressed records.
+            final List<PartitionInfo> partitions = producer.partitionsFor("data");
+            for (final PartitionInfo partition : partitions) {
+                producer.send(new ProducerRecord<>(
+                    partition.topic(),
+                    partition.partition(),
+                    System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                    stringSerde.serializer().serialize("", "flush"),
+                    intSerde.serializer().serialize("", 0)
+                ));
+            }
+        }
+        return Collections.unmodifiableMap(allData);
+    }
+
+    private static Properties generatorProperties(final String kafka) {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        return producerProps;
+    }
+
+    private static class TestCallback implements Callback {
+        private final ProducerRecord<byte[], byte[]> originalRecord;
+        private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+        TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+                     final List<ProducerRecord<byte[], byte[]>> needRetry) {
+            this.originalRecord = originalRecord;
+            this.needRetry = needRetry;
+        }
+
+        @Override
+        public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+            if (exception != null) {
+                if (exception instanceof TimeoutException) {
+                    needRetry.add(originalRecord);
+                } else {
+                    exception.printStackTrace();
+                    Exit.exit(1);
+                }
+            }
+        }
+    }
+
+    private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
+        final Random rand = new Random();
+        for (int i = 0; i < data.length; i++) {
+            // we shuffle data within windowSize
+            final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
+
+            // swap
+            final int tmp = data[i];
+            data[i] = data[j];
+            data[j] = tmp;
+        }
+    }
+
+    public static class NumberDeserializer implements Deserializer<Number> {
+        @Override
+        public Number deserialize(final String topic, final byte[] data) {
+            final Number value;
+            switch (topic) {
+                case "data":
+                case "echo":
+                case "min":
+                case "min-raw":
+                case "min-suppressed":
+                case "sws-raw":
+                case "sws-suppressed":
+                case "max":
+                case "dif":
+                    value = intSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "sum":
+                case "cnt":
+                case "tagg":
+                    value = longSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "avg":
+                    value = doubleSerde.deserializer().deserialize(topic, data);
+                    break;
+                default:
+                    throw new RuntimeException("unknown topic: " + topic);
+            }
+            return value;
+        }
+    }
+
+    public static VerificationResult verify(final String kafka,
+                                            final Map<String, Set<Integer>> inputs,
+                                            final int maxRecordsPerKey) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+        final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+
+        final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+        int recordsProcessed = 0;
+        final Map<String, AtomicInteger> processed =
+            Stream.of(TOPICS)
+                  .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
+
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
+
+        VerificationResult verificationResult = new VerificationResult(false, "no results yet");
+        int retry = 0;
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
+            final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
+            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
+                verificationResult = verifyAll(inputs, events, false);
+                if (verificationResult.passed()) {
+                    break;
+                } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                    System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
+                    break;
+                } else {
+                    System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
+                }
+            } else {
+                System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
+
+                retry = 0;
+                for (final ConsumerRecord<String, Number> record : records) {
+                    final String key = record.key();
+
+                    final String topic = record.topic();
+                    processed.get(topic).incrementAndGet();
+
+                    if (topic.equals("echo")) {
+                        recordsProcessed++;
+                        if (recordsProcessed % 100 == 0) {
+                            System.out.println("Echo records processed = " + recordsProcessed);
+                        }
+                    }
+
+                    events.computeIfAbsent(topic, t -> new HashMap<>())
+                          .computeIfAbsent(key, k -> new LinkedList<>())
+                          .add(record);
+                }
+
+                System.out.println(processed);
+            }
+        }
+        consumer.close();
+        final long finished = System.currentTimeMillis() - start;
+        System.out.println("Verification time=" + finished);
+        System.out.println("-------------------");
+        System.out.println("Result Verification");
+        System.out.println("-------------------");
+        System.out.println("recordGenerated=" + recordsGenerated);
+        System.out.println("recordProcessed=" + recordsProcessed);
+
+        if (recordsProcessed > recordsGenerated) {
+            System.out.println("PROCESSED-MORE-THAN-GENERATED");
+        } else if (recordsProcessed < recordsGenerated) {
+            System.out.println("PROCESSED-LESS-THAN-GENERATED");
+        }
+
+        boolean success;
+
+        final Map<String, Set<Number>> received =
+            events.get("echo")
+                  .entrySet()
+                  .stream()
+                  .map(entry -> mkEntry(
+                      entry.getKey(),
+                      entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
+                  )
+                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        success = inputs.equals(received);
+
+        if (success) {
+            System.out.println("ALL-RECORDS-DELIVERED");
+        } else {
+            int missedCount = 0;
+            for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
+                missedCount += received.get(entry.getKey()).size();
+            }
+            System.out.println("missedRecords=" + missedCount);
+        }
+
+        // give it one more try if it's not already passing.
+        if (!verificationResult.passed()) {
+            verificationResult = verifyAll(inputs, events, true);
+        }
+        success &= verificationResult.passed();
+
+        System.out.println(verificationResult.result());
+
+        System.out.println(success ? "SUCCESS" : "FAILURE");
+        return verificationResult;
+    }
+
+    public static class VerificationResult {
+        private final boolean passed;
+        private final String result;
+
+        VerificationResult(final boolean passed, final String result) {
+            this.passed = passed;
+            this.result = result;
+        }
+
+        public boolean passed() {
+            return passed;
+        }
+
+        public String result() {
+            return result;
+        }
+    }
+
+    private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
+                                                final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                                final boolean printResults) {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        boolean pass;
+        try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
+            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
+            pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
+            pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
+                final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
+                return getMin(unwindowedKey);
+            }, printResults);
+            pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
+            pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
+            pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
+            pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults);
+            pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
+            pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
+            pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults);
+        }
+        return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
+    }
+
+    private static boolean verify(final PrintStream resultStream,
+                                  final String topic,
+                                  final Map<String, Set<Integer>> inputData,
+                                  final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                  final Function<String, Number> keyToExpectation,
+                                  final boolean printResults) {
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
+        if (outputEvents.isEmpty()) {
+            resultStream.println(topic + " is empty");
+            return false;
+        } else {
+            resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
+
+            if (outputEvents.size() != inputData.size()) {
+                resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
+                                    outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
+                return false;
+            }
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
+                final String key = entry.getKey();
+                final Number expected = keyToExpectation.apply(key);
+                final Number actual = entry.getValue().getLast().value();
+                if (!expected.equals(actual)) {
+                    resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
+
+                    if (printResults) {
+                        resultStream.printf("\t inputEvents=%n%s%n\t" +
+                                "echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
+                            indent("\t\t", observedInputEvents.get(key)),
+                            indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>())));
+
+                        if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic))
+                            resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue()));
+                    }
+
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+
+    private static boolean verifySuppressed(final PrintStream resultStream,
+                                            @SuppressWarnings("SameParameterValue") final String topic,
+                                            final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                            final boolean printResults) {
+        resultStream.println("verifying suppressed " + topic);
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap());
+        for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) {
+            if (entry.getValue().size() != 1) {
+                final String unsuppressedTopic = topic.replace("-suppressed", "-raw");
+                final String key = entry.getKey();
+                final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
+                resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
+                                    key,
+                                    indent("\t\t", entry.getValue()));
+
+                if (printResults)
+                    resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
+                        indent("\t\t", events.get(unsuppressedTopic).get(key)),
+                        indent("\t\t", events.get("data").get(unwindowedKey)));
+
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
+                                 final Iterable<ConsumerRecord<String, Number>> list) {
+        final StringBuilder stringBuilder = new StringBuilder();
+        for (final ConsumerRecord<String, Number> record : list) {
+            stringBuilder.append(prefix).append(record).append('\n');
+        }
+        return stringBuilder.toString();
+    }
+
+    private static Long getSum(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) * (max - min + 1L) / 2L;
+    }
+
+    private static Double getAvg(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) / 2.0;
+    }
+
+
+    private static boolean verifyTAgg(final PrintStream resultStream,
+                                      final Map<String, Set<Integer>> allData,
+                                      final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+                                      final boolean printResults) {
+        if (taggEvents == null) {
+            resultStream.println("tagg is missing");
+            return false;
+        } else if (taggEvents.isEmpty()) {
+            resultStream.println("tagg is empty");
+            return false;
+        } else {
+            resultStream.println("verifying tagg");
+
+            // generate expected answer
+            final Map<String, Long> expected = new HashMap<>();
+            for (final String key : allData.keySet()) {
+                final int min = getMin(key).intValue();
+                final int max = getMax(key).intValue();
+                final String cnt = Long.toString(max - min + 1L);
+
+                expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
+            }
+
+            // check the result
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) {
+                final String key = entry.getKey();
+                Long expectedCount = expected.remove(key);
+                if (expectedCount == null) {
+                    expectedCount = 0L;
+                }
+
+                if (entry.getValue().getLast().value().longValue() != expectedCount) {
+                    resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
+
+                    if (printResults)
+                        resultStream.println("\t taggEvents: " + entry.getValue());
+                    return false;
+                }
+            }
+
+        }
+        return true;
+    }
+
+    private static Number getMin(final String key) {
+        return Integer.parseInt(key.split("-")[0]);
+    }
+
+    private static Number getMax(final String key) {
+        return Integer.parseInt(key.split("-")[1]);
+    }
+
+    private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
+        final List<TopicPartition> partitions = new ArrayList<>();
+
+        for (final String topic : topics) {
+            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
similarity index 70%
copy from streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
copy to streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 90e6ccd..e8ec04c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
-import java.io.File;
 import java.time.Instant;
 
 public class SmokeTestUtil {
@@ -45,12 +44,17 @@ public class SmokeTestUtil {
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
+                    private long smallestOffset = Long.MAX_VALUE;
+                    private long largestOffset = Long.MIN_VALUE;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         super.init(context);
                         System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        System.out.flush();
                         numRecordsProcessed = 0;
+                        smallestOffset = Long.MAX_VALUE;
+                        largestOffset = Long.MIN_VALUE;
                     }
 
                     @Override
@@ -60,6 +64,27 @@ public class SmokeTestUtil {
                             System.out.printf("%s: %s%n", name, Instant.now());
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                         }
+
+                        if (smallestOffset > context().offset()) {
+                            smallestOffset = context().offset();
+                        }
+                        if (largestOffset < context().offset()) {
+                            largestOffset = context().offset();
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        System.out.printf("Close processor for task %s%n", context().taskId());
+                        System.out.println("processed " + numRecordsProcessed + " records");
+                        final long processed;
+                        if (largestOffset >= smallestOffset) {
+                            processed = 1L + largestOffset - smallestOffset;
+                        } else {
+                            processed = 0L;
+                        }
+                        System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
+                        System.out.flush();
                     }
                 };
             }
@@ -76,39 +101,19 @@ public class SmokeTestUtil {
     public static class Agg {
 
         KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
-            return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
-                @Override
-                public KeyValue<String, Long> apply(final String key, final Long value) {
-                    return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
-                }
-            };
+            return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L);
         }
 
         public Initializer<Long> init() {
-            return new Initializer<Long>() {
-                @Override
-                public Long apply() {
-                    return 0L;
-                }
-            };
+            return () -> 0L;
         }
 
         Aggregator<String, Long, Long> adder() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate + value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate + value;
         }
 
         Aggregator<String, Long, Long> remover() {
-            return new Aggregator<String, Long, Long>() {
-                @Override
-                public Long apply(final String aggKey, final Long value, final Long aggregate) {
-                    return aggregate - value;
-                }
-            };
+            return (aggKey, value, aggregate) -> aggregate - value;
         }
     }
 
@@ -120,14 +125,6 @@ public class SmokeTestUtil {
 
     static Serde<Double> doubleSerde = Serdes.Double();
 
-    static File createDir(final File parent, final String child) {
-        final File dir = new File(parent, child);
-
-        dir.mkdir();
-
-        return dir;
-    }
-
     public static void sleep(final long duration) {
         try {
             Thread.sleep(duration);
diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
new file mode 100644
index 0000000..07c7d5d
--- /dev/null
+++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {
+
+    /**
+     *  args ::= kafka propFileName command disableAutoTerminate
+     *  command := "run" | "process"
+     *
+     * @param args
+     */
+    public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
+        final boolean disableAutoTerminate = args.length > 2;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
+        if ("process".equals(command)) {
+            if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+                !StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)) {
+
+                System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
+                    StreamsConfig.EXACTLY_ONCE);
+
+                System.exit(1);
+            }
+        }
+
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
+        System.out.println("command=" + command);
+        System.out.println("props=" + streamsProperties);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
+
+        switch (command) {
+            case "run":
+                // this starts the driver (data generation and result verification)
+                final int numKeys = 10;
+                final int maxRecordsPerKey = 500;
+                if (disableAutoTerminate) {
+                    generatePerpetually(kafka, numKeys, maxRecordsPerKey);
+                } else {
+                    // slow down data production to span 30 seconds so that system tests have time to
+                    // do their bounces, etc.
+                    final Map<String, Set<Integer>> allData =
+                        generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30));
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
+                break;
+            case "process":
+                // this starts the stream processing app
+                new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index 9c37608..20d2e92 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -58,6 +58,7 @@ RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFK
 RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2"
 RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1"
 RUN mkdir -p "/opt/kafka-2.4.0" && chmod a+rw /opt/kafka-2.4.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.4.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.4.0"
+RUN mkdir -p "/opt/kafka-2.5.0" && chmod a+rw /opt/kafka-2.5.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.5.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.5.0"
 
 # Streams test dependencies
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
@@ -71,6 +72,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/lib
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.4.0-test.jar" -o /opt/kafka-2.4.0/libs/kafka-streams-2.4.0-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.5.0-test.jar" -o /opt/kafka-2.5.0/libs/kafka-streams-2.5.0-test.jar
 
 # The version of Kibosh to use for testing.
 # If you update this, also update vagrant/base.sh
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 3767d85..63ec055 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -44,6 +44,18 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
     CLEAN_NODE_ENABLED = True
 
     logs = {
+        "streams_config": {
+            "path": CONFIG_FILE,
+            "collect_default": True},
+        "streams_config.1": {
+            "path": CONFIG_FILE + ".1",
+            "collect_default": True},
+        "streams_config.0-1": {
+            "path": CONFIG_FILE + ".0-1",
+            "collect_default": True},
+        "streams_config.1-1": {
+            "path": CONFIG_FILE + ".1-1",
+            "collect_default": True},
         "streams_log": {
             "path": LOG_FILE,
             "collect_default": True},
@@ -212,8 +224,10 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
 
     def pids(self, node):
         try:
-            return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
-        except:
+            pids = [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=str)]
+            return [int(pid) for pid in pids]
+        except Exception, exception:
+            self.logger.debug(str(exception))
             return []
 
     def stop_nodes(self, clean_shutdown=True):
@@ -302,21 +316,62 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
                                                           "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
         self.NUM_THREADS = num_threads
+        self.PROCESSING_GUARANTEE = processing_guarantee
+        self.KAFKA_STREAMS_VERSION = ""
+        self.UPGRADE_FROM = None
+        self.REPLICATION_FACTOR = replication_factor
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from
 
     def prop_file(self):
         properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
                       streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
-                      streams_property.NUM_THREADS: self.NUM_THREADS}
+                      "processing.guarantee": self.PROCESSING_GUARANTEE,
+                      streams_property.NUM_THREADS: self.NUM_THREADS,
+                      "replication.factor": self.REPLICATION_FACTOR,
+                      "num.standby.replicas": 2,
+                      "buffered.records.per.partition": 100,
+                      "commit.interval.ms": 1000,
+                      "auto.offset.reset": "earliest",
+                      "acks": "all"}
+
+        if self.UPGRADE_FROM is not None:
+            properties['upgrade.from'] = self.UPGRADE_FROM
 
         cfg = KafkaConfig(**properties)
         return cfg.render()
 
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['config_file'] = self.CONFIG_FILE
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['version'] = self.KAFKA_STREAMS_VERSION
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\";" \
+              " INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s" \
+              " bash -x %(kafka_run_class)s %(streams_class_name)s" \
+              " %(config_file)s %(user_test_args1)s" \
+              " & echo $! >&3 ) " \
+              "1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        self.logger.info("Executing streams cmd: " + cmd)
+
+        return cmd
+
 class StreamsEosTestBaseService(StreamsTestBaseService):
     """Base class for Streams EOS Test services providing some common settings and functionality"""
 
@@ -359,8 +414,8 @@ class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
         return cmd
 
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
-    def __init__(self, test_context, kafka, num_threads = 3):
-        super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", num_threads)
+    def __init__(self, test_context, kafka, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
+        super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee = processing_guarantee, num_threads = num_threads, replication_factor = replication_factor)
 
 class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
@@ -471,6 +526,10 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
                                                                  "")
         self.UPGRADE_FROM = None
         self.UPGRADE_TO = None
+        self.extra_properties = {}
+
+    def set_config(self, key, value):
+        self.extra_properties[key] = value
 
     def set_version(self, kafka_streams_version):
         self.KAFKA_STREAMS_VERSION = kafka_streams_version
@@ -482,8 +541,10 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
         self.UPGRADE_TO = upgrade_to
 
     def prop_file(self):
-        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
-                      streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
+        properties = self.extra_properties.copy()
+        properties[streams_property.STATE_DIR] = self.PERSISTENT_ROOT
+        properties[streams_property.KAFKA_SERVERS] = self.kafka.bootstrap_servers()
+
         if self.UPGRADE_FROM is not None:
             properties['upgrade.from'] = self.UPGRADE_FROM
         if self.UPGRADE_TO == "future_version":
diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
new file mode 100644
index 0000000..c27ef5b
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]
+dev_version = [str(DEV_VERSION)]
+
+class StreamsUpgradeTest(Test):
+    """
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0 and
+    subsequently bumped in 2.0.0
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context)
+        self.topics = {
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 },
+        }
+
+    processed_msg = "processed [0-9]* records"
+    base_version_number = str(DEV_VERSION).split("-")[0]
+
+    def perform_broker_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling broker upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_version)
+            self.kafka.start_node(node)
+
+    @matrix(from_version=smoke_test_versions, to_version=dev_version, bounce_type=["full"])
+    def test_app_upgrade(self, from_version, to_version, bounce_type):
+        """
+        Starts 3 KafkaStreams instances with <old_version>, and upgrades one-by-one to <new_version>
+        """
+
+        if from_version == to_version:
+            return
+
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics={
+            'echo' : { 'partitions': 5, 'replication-factor': 1 },
+            'data' : { 'partitions': 5, 'replication-factor': 1 },
+            'min' : { 'partitions': 5, 'replication-factor': 1 },
+            'min-suppressed' : { 'partitions': 5, 'replication-factor': 1 },
+            'min-raw' : { 'partitions': 5, 'replication-factor': 1 },
+            'max' : { 'partitions': 5, 'replication-factor': 1 },
+            'sum' : { 'partitions': 5, 'replication-factor': 1 },
+            'sws-raw' : { 'partitions': 5, 'replication-factor': 1 },
+            'sws-suppressed' : { 'partitions': 5, 'replication-factor': 1 },
+            'dif' : { 'partitions': 5, 'replication-factor': 1 },
+            'cnt' : { 'partitions': 5, 'replication-factor': 1 },
+            'avg' : { 'partitions': 5, 'replication-factor': 1 },
+            'wcnt' : { 'partitions': 5, 'replication-factor': 1 },
+            'tagg' : { 'partitions': 5, 'replication-factor': 1 }
+        })
+        self.kafka.start()
+
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.driver.disable_auto_terminate()
+        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee = "at_least_once", replication_factor = 1)
+        self.processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee = "at_least_once", replication_factor = 1)
+        self.processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee = "at_least_once", replication_factor = 1)
+
+        self.purge_state_dir(self.processor1)
+        self.purge_state_dir(self.processor2)
+        self.purge_state_dir(self.processor3)
+
+        self.driver.start()
+        self.start_all_nodes_with(from_version)
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+
+        if bounce_type == "rolling":
+            counter = 1
+            random.seed()
+            # upgrade one-by-one via rolling bounce
+            random.shuffle(self.processors)
+            for p in self.processors:
+                p.CLEAN_NODE_ENABLED = False
+                self.do_stop_start_bounce(p, None, to_version, counter)
+                counter = counter + 1
+        elif bounce_type == "full":
+            self.restart_all_nodes_with(to_version)
+        else:
+            raise Exception("Unrecognized bounce_type: " + str(bounce_type))
+
+
+        # shutdown
+        self.driver.stop()
+
+        # Ideally, we would actually verify the expected results.
+        # See KAFKA-10202
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("SMOKE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'SMOKE-TEST-CLIENT-CLOSED' on " + str(node.account))
+
+    def start_all_nodes_with(self, version):
+
+        self.set_version(self.processor1, version)
+        self.set_version(self.processor2, version)
+        self.set_version(self.processor3, version)
+
+        self.processor1.start()
+        self.processor2.start()
+        self.processor3.start()
+
+        # double-check the version
+        kafka_version_str = self.get_version_string(version)
+        self.wait_for_verification(self.processor1, kafka_version_str, self.processor1.LOG_FILE)
+        self.wait_for_verification(self.processor2, kafka_version_str, self.processor2.LOG_FILE)
+        self.wait_for_verification(self.processor3, kafka_version_str, self.processor3.LOG_FILE)
+
+        # wait for the members to join
+        self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-STARTED", self.processor1.STDOUT_FILE)
+        self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-STARTED", self.processor2.STDOUT_FILE)
+        self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-STARTED", self.processor3.STDOUT_FILE)
+
+        # make sure they've processed something
+        self.wait_for_verification(self.processor1, self.processed_msg, self.processor1.STDOUT_FILE)
+        self.wait_for_verification(self.processor2, self.processed_msg, self.processor2.STDOUT_FILE)
+        self.wait_for_verification(self.processor3, self.processed_msg, self.processor3.STDOUT_FILE)
+
+    def restart_all_nodes_with(self, version):
+        self.processor1.stop_node(self.processor1.node)
+        self.processor2.stop_node(self.processor2.node)
+        self.processor3.stop_node(self.processor3.node)
+
+        # make sure the members have stopped
+        self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-CLOSED", self.processor1.STDOUT_FILE)
+        self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-CLOSED", self.processor2.STDOUT_FILE)
+        self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-CLOSED", self.processor3.STDOUT_FILE)
+
+        self.roll_logs(self.processor1, ".1-1")
+        self.roll_logs(self.processor2, ".1-1")
+        self.roll_logs(self.processor3, ".1-1")
+
+        self.set_version(self.processor1, version)
+        self.set_version(self.processor2, version)
+        self.set_version(self.processor3, version)
+
+        self.processor1.start_node(self.processor1.node)
+        self.processor2.start_node(self.processor2.node)
+        self.processor3.start_node(self.processor3.node)
+
+        # double-check the version
+        kafka_version_str = self.get_version_string(version)
+        self.wait_for_verification(self.processor1, kafka_version_str, self.processor1.LOG_FILE)
+        self.wait_for_verification(self.processor2, kafka_version_str, self.processor2.LOG_FILE)
+        self.wait_for_verification(self.processor3, kafka_version_str, self.processor3.LOG_FILE)
+
+        # wait for the members to join
+        self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-STARTED", self.processor1.STDOUT_FILE)
+        self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-STARTED", self.processor2.STDOUT_FILE)
+        self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-STARTED", self.processor3.STDOUT_FILE)
+
+        # make sure they've processed something
+        self.wait_for_verification(self.processor1, self.processed_msg, self.processor1.STDOUT_FILE)
+        self.wait_for_verification(self.processor2, self.processed_msg, self.processor2.STDOUT_FILE)
+        self.wait_for_verification(self.processor3, self.processed_msg, self.processor3.STDOUT_FILE)
+
+    def get_version_string(self, version):
+        if version.startswith("0") or version.startswith("1") \
+          or version.startswith("2.0") or version.startswith("2.1"):
+            return "Kafka version : " + version
+        elif "SNAPSHOT" in version:
+            return "Kafka version.*" + self.base_version_number + ".*SNAPSHOT"
+        else:
+            return "Kafka version: " + version
+
+    def wait_for_verification(self, processor, message, file, num_lines=1):
+        wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
+                   timeout_sec=60,
+                   err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))
+
+    def verify_from_file(self, processor, message, file):
+        result = processor.node.account.ssh_output("grep -E '%s' %s | wc -l" % (message, file), allow_fail=False)
+        try:
+            return int(result)
+        except ValueError:
+            self.logger.warn("Command failed with ValueError: " + result)
+            return 0
+
+    def set_version(self, processor, version):
+        if version == str(DEV_VERSION):
+            processor.set_version("")  # set to TRUNK
+        else:
+            processor.set_version(version)
+
+    def purge_state_dir(self, processor):
+        processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
+
+    def do_stop_start_bounce(self, processor, upgrade_from, new_version, counter):
+        kafka_version_str = self.get_version_string(new_version)
+
+        first_other_processor = None
+        second_other_processor = None
+        for p in self.processors:
+            if p != processor:
+                if first_other_processor is None:
+                    first_other_processor = p
+                else:
+                    second_other_processor = p
+
+        node = processor.node
+        first_other_node = first_other_processor.node
+        second_other_node = second_other_processor.node
+
+        # stop processor and wait for rebalance of others
+        with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+            with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                processor.stop_node(processor.node)
+                first_other_monitor.wait_until(self.processed_msg,
+                                               timeout_sec=60,
+                                               err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
+                second_other_monitor.wait_until(self.processed_msg,
+                                                timeout_sec=60,
+                                                err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
+        node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
+
+        if upgrade_from is None:  # upgrade disabled -- second round of rolling bounces
+            roll_counter = ".1-"  # second round of rolling bounces
+        else:
+            roll_counter = ".0-"  # first  round of rolling bounces
+
+        self.roll_logs(processor, roll_counter + str(counter))
+
+        self.set_version(processor, new_version)
+        processor.set_upgrade_from(upgrade_from)
+
+        grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+                with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+                    with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                        processor.start_node(processor.node)
+
+                        log_monitor.wait_until(kafka_version_str,
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account))
+                        first_other_monitor.wait_until(self.processed_msg,
+                                                       timeout_sec=60,
+                                                       err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
+                        found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+                        second_other_monitor.wait_until(self.processed_msg,
+                                                        timeout_sec=60,
+                                                        err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
+                        found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+                        monitor.wait_until(self.processed_msg,
+                                           timeout_sec=60,
+                                           err_msg="Never saw output '%s' on " % self.processed_msg + str(node.account))
+
+    def roll_logs(self, processor, roll_suffix):
+        processor.node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_suffix,
+                                   allow_fail=False)
+        processor.node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_suffix,
+                                   allow_fail=False)
+        processor.node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_suffix,
+                                   allow_fail=False)
+        processor.node.account.ssh("mv " + processor.CONFIG_FILE + " " + processor.CONFIG_FILE + roll_suffix,
+                                   allow_fail=False)
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index a20eb02..9b624fe 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -164,7 +164,7 @@ class StreamsBrokerBounceTest(Test):
 
         # Start test harness
         self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads)
+        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads = num_threads)
 
         self.driver.start()
 
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 9571853..077b6ff 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -138,3 +138,7 @@ LATEST_2_3 = V_2_3_1
 # 2.4.x versions
 V_2_4_0 = KafkaVersion("2.4.0")
 LATEST_2_4 = V_2_4_0
+
+# 2.5.x versions
+V_2_5_0 = KafkaVersion("2.5.0")
+LATEST_2_5 = V_2_5_0
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 2d213ef..88ebae5 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -140,6 +140,8 @@ get_kafka 2.3.1 2.12
 chmod a+rw /opt/kafka-2.3.1
 get_kafka 2.4.0 2.12
 chmod a+rw /opt/kafka-2.4.0
+get_kafka 2.5.0 2.12
+chmod a+rw /opt/kafka-2.5.0
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local
 # VMs, we can just create it if it doesn't exist and use it like we'd use