You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/27 18:08:26 UTC

[GitHub] [kafka] vvcephei opened a new pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

vvcephei opened a new pull request #8938:
URL: https://github.com/apache/kafka/pull/8938


   Replaces the previous upgrade test's trivial Streams app
   with the commonly used SmokeTest, exercising many more
   features. Also adjust the test matrix to test upgrading
   from each released version since 2.2 to the current branch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r446551977



##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {

Review comment:
       I inlined these utilities to make this class "portable". I.e., we can copy and paste it into the "upgrade test" modules without also dragging in a dependency on the client utils.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
##########
@@ -104,10 +104,6 @@ public void shouldWorkWithRebalance() throws InterruptedException {
             clients.add(smokeTestClient);
             smokeTestClient.start(props);
 
-            while (!clients.get(clients.size() - 1).started()) {
-                Thread.sleep(100);
-            }

Review comment:
       This isn't needed anymore, as `SmokeTestClient#start` now blocks until the instance goes to running the first time.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));

Review comment:
       5 seconds seems a bit stingy :) . Note that we were previously ignoring the case where we didn't close within the timeout, so we have no idea if 5 seconds was ever long enough. I figured a minute is more reasonable.

##########
File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {

Review comment:
       copy/pasted

##########
File path: streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");

Review comment:
       Blocking and then printing this gives the system tests the ability to explicitly wait until the instances are joined.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;

Review comment:
       no longer needed, since start() is now blocking

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;

Review comment:
       This thread was actually pointless, since StreamThreads are already user (not daemon) threads.

##########
File path: streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
##########
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {

Review comment:
       Everything in the `upgrade-system-tests...` directories is just copy/pasted from the main SmokeTest implementations.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {

Review comment:
       I just happened to notice that we weren't previously checking that Streams actually finished closing. It looks like we were expecting some kind of exception to get thrown during the `join()` below, but that's not the way it works.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");

Review comment:
       So here's what we print if there was no uncaught exception, but we also didn't close in time.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);

Review comment:
       Inlining this function actually fixed a bug in which the next line was setting a handler that actually replaced the handler registered in createKafkaStreams.
   
   The function was only used from right here anyway, so it was needless complexity.

##########
File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.time.Instant;
+
+public class SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
     private Properties getStreamsConfig(final Properties props) {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");

Review comment:
       Moved these to the python code, where the "properties file" itself is built. I left only the properties that are better off dynamically generated in the java code.

##########
File path: streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.time.Instant;
+
+public class SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));

Review comment:
       This is logic from the inlined function that had gotten lost when we set the handler again.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##########
@@ -75,7 +75,7 @@ public void process(final Object key, final Object value) {
 
                     @Override
                     public void close() {
-                        System.out.printf("Close processor for task %s", context().taskId());
+                        System.out.printf("Close processor for task %s%n", context().taskId());

Review comment:
       I just happened to notice that the newline was missing when I looked at the stdout. It didn't affect the tests' ability to grep.

##########
File path: streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.time.Instant;
+
+public class SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
##########
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {

Review comment:
       copy/pasted

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -305,23 +305,62 @@ def start_node(self, node):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
                                                           "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
         self.NUM_THREADS = num_threads
         self.PROCESSING_GUARANTEE = processing_guarantee
+        self.KAFKA_STREAMS_VERSION = ""
+        self.UPGRADE_FROM = None
+        self.REPLICATION_FACTOR = replication_factor
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from

Review comment:
       some other stuff for the upgrade tests.

##########
File path: streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {

Review comment:
       copy/pasted

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -305,23 +305,62 @@ def start_node(self, node):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):

Review comment:
       Added the ability to set this, so that we can just run one broker from the upgrade tests.

##########
File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
##########
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.time.Instant;
+
+public class SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {

Review comment:
       copy/pasted

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -37,6 +37,9 @@
 # can be replaced with metadata_2_versions
 backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(DEV_VERSION)]
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]

Review comment:
       Unfortunately, 2.1 doesn't work. See https://issues.apache.org/jira/browse/KAFKA-10203

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -189,8 +192,8 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version):
         processor.stop()
         processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
-    @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
-    def test_simple_upgrade_downgrade(self, from_version, to_version):
+    @matrix(from_version=smoke_test_versions, to_version=dev_version)
+    def test_app_upgrade(self, from_version, to_version):

Review comment:
       Changed the name to reflect that we're just testing upgrades now.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -305,23 +305,62 @@ def start_node(self, node):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
                                                           "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
         self.NUM_THREADS = num_threads
         self.PROCESSING_GUARANTEE = processing_guarantee
+        self.KAFKA_STREAMS_VERSION = ""
+        self.UPGRADE_FROM = None
+        self.REPLICATION_FACTOR = replication_factor
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from
 
     def prop_file(self):
         properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
                       streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
                       streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
-                      streams_property.NUM_THREADS: self.NUM_THREADS}
+                      streams_property.NUM_THREADS: self.NUM_THREADS,
+                      "replication.factor": self.REPLICATION_FACTOR,
+                      "num.standby.replicas": 2,
+                      "buffered.records.per.partition": 100,
+                      "commit.interval.ms": 1000,
+                      "auto.offset.reset": "earliest",
+                      "acks": "all"}

Review comment:
       moved from Java

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -201,14 +204,29 @@ def test_simple_upgrade_downgrade(self, from_version, to_version):
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
         self.zk.start()
 
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics={
+            'echo' : { 'partitions': 5, 'replication-factor': 1 },
+            'data' : { 'partitions': 5, 'replication-factor': 1 },
+            'min' : { 'partitions': 5, 'replication-factor': 1 },
+            'min-suppressed' : { 'partitions': 5, 'replication-factor': 1 },
+            'min-raw' : { 'partitions': 5, 'replication-factor': 1 },
+            'max' : { 'partitions': 5, 'replication-factor': 1 },
+            'sum' : { 'partitions': 5, 'replication-factor': 1 },
+            'sws-raw' : { 'partitions': 5, 'replication-factor': 1 },
+            'sws-suppressed' : { 'partitions': 5, 'replication-factor': 1 },
+            'dif' : { 'partitions': 5, 'replication-factor': 1 },
+            'cnt' : { 'partitions': 5, 'replication-factor': 1 },
+            'avg' : { 'partitions': 5, 'replication-factor': 1 },
+            'wcnt' : { 'partitions': 5, 'replication-factor': 1 },
+            'tagg' : { 'partitions': 5, 'replication-factor': 1 }
+        })

Review comment:
       required setup for the smoke test app

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -349,56 +370,42 @@ def get_version_string(self, version):
     def start_all_nodes_with(self, version):
         kafka_version_str = self.get_version_string(version)
 
-        # start first with <version>
         self.prepare_for(self.processor1, version)
-        node1 = self.processor1.node
-        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
-            with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
-                self.processor1.start()
-                log_monitor.wait_until(kafka_version_str,
-                                       timeout_sec=60,
-                                       err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
-                monitor.wait_until(self.processed_msg,
-                                   timeout_sec=60,
-                                   err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-
-        # start second with <version>
         self.prepare_for(self.processor2, version)
-        node2 = self.processor2.node
-        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
-            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
-                with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
-                    self.processor2.start()
-                    log_monitor.wait_until(kafka_version_str,
-                                           timeout_sec=60,
-                                           err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
-                    first_monitor.wait_until(self.processed_msg,
-                                             timeout_sec=60,
-                                             err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-                    second_monitor.wait_until(self.processed_msg,
-                                              timeout_sec=60,
-                                              err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
-
-        # start third with <version>
         self.prepare_for(self.processor3, version)
-        node3 = self.processor3.node
-        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
-            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
-                with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
-                    with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
-                        self.processor3.start()
-                        log_monitor.wait_until(kafka_version_str,
-                                               timeout_sec=60,
-                                               err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
-                        first_monitor.wait_until(self.processed_msg,
-                                                 timeout_sec=60,
-                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-                        second_monitor.wait_until(self.processed_msg,
-                                                  timeout_sec=60,
-                                                  err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
-                        third_monitor.wait_until(self.processed_msg,
-                                                 timeout_sec=60,
-                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
+
+        self.processor1.start()
+        self.processor2.start()
+        self.processor3.start()

Review comment:
       I changed the "first time startup" method to just start all the instances at the same time, rather than waiting for them to start up and process one at a time.
   
   For the upgrade part of the test, we still do rolling upgrades.

##########
File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {

Review comment:
       copy/pasted

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -349,56 +370,42 @@ def get_version_string(self, version):
     def start_all_nodes_with(self, version):
         kafka_version_str = self.get_version_string(version)
 
-        # start first with <version>
         self.prepare_for(self.processor1, version)
-        node1 = self.processor1.node
-        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
-            with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
-                self.processor1.start()
-                log_monitor.wait_until(kafka_version_str,
-                                       timeout_sec=60,
-                                       err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
-                monitor.wait_until(self.processed_msg,
-                                   timeout_sec=60,
-                                   err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-
-        # start second with <version>
         self.prepare_for(self.processor2, version)
-        node2 = self.processor2.node
-        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
-            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
-                with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
-                    self.processor2.start()
-                    log_monitor.wait_until(kafka_version_str,
-                                           timeout_sec=60,
-                                           err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
-                    first_monitor.wait_until(self.processed_msg,
-                                             timeout_sec=60,
-                                             err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-                    second_monitor.wait_until(self.processed_msg,
-                                              timeout_sec=60,
-                                              err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
-
-        # start third with <version>
         self.prepare_for(self.processor3, version)
-        node3 = self.processor3.node
-        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
-            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
-                with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
-                    with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
-                        self.processor3.start()
-                        log_monitor.wait_until(kafka_version_str,
-                                               timeout_sec=60,
-                                               err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
-                        first_monitor.wait_until(self.processed_msg,
-                                                 timeout_sec=60,
-                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-                        second_monitor.wait_until(self.processed_msg,
-                                                  timeout_sec=60,
-                                                  err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
-                        third_monitor.wait_until(self.processed_msg,
-                                                 timeout_sec=60,
-                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
+
+        self.processor1.start()
+        self.processor2.start()
+        self.processor3.start()
+
+        # double-check the version
+        self.wait_for_verification(self.processor1, kafka_version_str, self.processor1.LOG_FILE)
+        self.wait_for_verification(self.processor2, kafka_version_str, self.processor2.LOG_FILE)
+        self.wait_for_verification(self.processor3, kafka_version_str, self.processor3.LOG_FILE)
+
+        # wait for the members to join
+        self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-STARTED", self.processor1.STDOUT_FILE)
+        self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-STARTED", self.processor2.STDOUT_FILE)
+        self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-STARTED", self.processor3.STDOUT_FILE)
+
+        # make sure they've processed something
+        self.wait_for_verification(self.processor1, self.processed_msg, self.processor1.STDOUT_FILE)
+        self.wait_for_verification(self.processor2, self.processed_msg, self.processor2.STDOUT_FILE)
+        self.wait_for_verification(self.processor3, self.processed_msg, self.processor3.STDOUT_FILE)
+
+    def wait_for_verification(self, processor, message, file, num_lines=1):
+        wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
+                   timeout_sec=60,
+                   err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))
+
+    @staticmethod
+    def verify_from_file(processor, message, file):

Review comment:
       Copied this over here so that we don't have to mess with monitors if we just want to search from the beginning of the files.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -349,56 +370,42 @@ def get_version_string(self, version):
     def start_all_nodes_with(self, version):
         kafka_version_str = self.get_version_string(version)
 
-        # start first with <version>
         self.prepare_for(self.processor1, version)
-        node1 = self.processor1.node
-        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
-            with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
-                self.processor1.start()
-                log_monitor.wait_until(kafka_version_str,
-                                       timeout_sec=60,
-                                       err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
-                monitor.wait_until(self.processed_msg,
-                                   timeout_sec=60,
-                                   err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-
-        # start second with <version>
         self.prepare_for(self.processor2, version)
-        node2 = self.processor2.node
-        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
-            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
-                with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
-                    self.processor2.start()
-                    log_monitor.wait_until(kafka_version_str,
-                                           timeout_sec=60,
-                                           err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
-                    first_monitor.wait_until(self.processed_msg,
-                                             timeout_sec=60,
-                                             err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-                    second_monitor.wait_until(self.processed_msg,
-                                              timeout_sec=60,
-                                              err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
-
-        # start third with <version>
         self.prepare_for(self.processor3, version)
-        node3 = self.processor3.node
-        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
-            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
-                with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
-                    with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
-                        self.processor3.start()
-                        log_monitor.wait_until(kafka_version_str,
-                                               timeout_sec=60,
-                                               err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
-                        first_monitor.wait_until(self.processed_msg,
-                                                 timeout_sec=60,
-                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
-                        second_monitor.wait_until(self.processed_msg,
-                                                  timeout_sec=60,
-                                                  err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
-                        third_monitor.wait_until(self.processed_msg,
-                                                 timeout_sec=60,
-                                                 err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
+
+        self.processor1.start()
+        self.processor2.start()
+        self.processor3.start()
+
+        # double-check the version
+        self.wait_for_verification(self.processor1, kafka_version_str, self.processor1.LOG_FILE)
+        self.wait_for_verification(self.processor2, kafka_version_str, self.processor2.LOG_FILE)
+        self.wait_for_verification(self.processor3, kafka_version_str, self.processor3.LOG_FILE)
+
+        # wait for the members to join

Review comment:
       Here's where we use that new output line after synchronously waiting to join the group in `start()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-652171893


   Oof, looks like I missed "something": http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-30--001.1593562449--vvcephei--kafka-10173-upgrde-smoke-system-test--1123f6f2c/report.html


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r448531033



##########
File path: tests/kafkatest/services/streams.py
##########
@@ -48,6 +48,15 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
         "streams_config": {
             "path": CONFIG_FILE,
             "collect_default": True},
+        "streams_config.1": {
+            "path": CONFIG_FILE + ".1",
+            "collect_default": True},
+        "streams_config.0-1": {
+            "path": CONFIG_FILE + ".0-1",
+            "collect_default": True},
+        "streams_config.1-1": {
+            "path": CONFIG_FILE + ".1-1",
+            "collect_default": True},

Review comment:
       Rotating the config file as well as the logs gives us better visibility for debugging.

##########
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]
+dev_version = [str(DEV_VERSION)]
+
+class StreamsUpgradeTest(Test):
+    """
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0 and
+    subsequently bumped in 2.0.0
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context)
+        self.topics = {
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 },
+        }
+
+    processed_msg = "processed [0-9]* records"
+    base_version_number = str(DEV_VERSION).split("-")[0]
+
+    def perform_broker_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling broker upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_version)
+            self.kafka.start_node(node)
+
+    @matrix(from_version=smoke_test_versions, to_version=dev_version, bounce_type=["full"])

Review comment:
       During a rolling bounce, we might see an old node try to read the changelog records of a new node, which would fail. All you have to do to recover is to continue upgrading, but we can't tolerate the failure in this system test. This is the exact same failure that we would see if we tried to downgrade.
   
   To repair the non-downgradability of Streams (and hence allow rolling bounces to work without errors), we need to go back and release forward-compatible versions of older releases. For example, 2.5.1 and 2.4.2 would be such versions. I'll create a ticket to come back here and add the missing matrix coordinates once those versions are released. Additionally, when new versions are released, we should add them to the downgrade matrix and the rolling-bounce matrix to prevent this situation from arising again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r448584762



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -189,54 +189,6 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version):
         processor.stop()
         processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
-    @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
-    def test_simple_upgrade_downgrade(self, from_version, to_version):

Review comment:
       Aha! For that, I've been preparing a separate PR: https://github.com/apache/kafka/pull/8971




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-652681930


   LGTM.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-653255715


   The full streams suite completed. There was just one unrelated failure: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-01--001.1593635888--vvcephei--kafka-10173-upgrde-smoke-system-test--131f98db8/report.html


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r448569679



##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {

Review comment:
       Sounds good.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();

Review comment:
       Do we still need a shutdown hook to delete with `file.deleteOnExit();`? I think the latter can still be triggered even with abnormal exits.

##########
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]
+dev_version = [str(DEV_VERSION)]
+
+class StreamsUpgradeTest(Test):
+    """
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0 and
+    subsequently bumped in 2.0.0
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context)
+        self.topics = {
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 },
+        }
+
+    processed_msg = "processed [0-9]* records"
+    base_version_number = str(DEV_VERSION).split("-")[0]
+
+    def perform_broker_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling broker upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_version)
+            self.kafka.start_node(node)
+
+    @matrix(from_version=smoke_test_versions, to_version=dev_version, bounce_type=["full"])

Review comment:
       `All you have to do to recover is to continue upgrading, but we can't tolerate the failure in this system test. ` how this is handled now?

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));

Review comment:
       +1

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -48,6 +48,15 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
         "streams_config": {
             "path": CONFIG_FILE,
             "collect_default": True},
+        "streams_config.1": {
+            "path": CONFIG_FILE + ".1",
+            "collect_default": True},
+        "streams_config.0-1": {
+            "path": CONFIG_FILE + ".0-1",
+            "collect_default": True},
+        "streams_config.1-1": {
+            "path": CONFIG_FILE + ".1-1",
+            "collect_default": True},

Review comment:
       Why only .1 / 0-1/ 1-1? There are more rotated log files below?

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -189,54 +189,6 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version):
         processor.stop()
         processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
-    @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
-    def test_simple_upgrade_downgrade(self, from_version, to_version):

Review comment:
       Should we merge the `test_metadata_upgrade` below to the new file? I think with the new principle it is less meaningful to test the upgrade path from an old v1 to another old v2, since, the test of v2 should already cover that. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");

Review comment:
       `nit: Didn't close in time`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r449684173



##########
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]
+dev_version = [str(DEV_VERSION)]
+
+class StreamsUpgradeTest(Test):
+    """
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0 and
+    subsequently bumped in 2.0.0
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context)
+        self.topics = {
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 },
+        }
+
+    processed_msg = "processed [0-9]* records"
+    base_version_number = str(DEV_VERSION).split("-")[0]
+
+    def perform_broker_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling broker upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_version)
+            self.kafka.start_node(node)
+
+    @matrix(from_version=smoke_test_versions, to_version=dev_version, bounce_type=["full"])

Review comment:
       Sounds great, thank you!!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8938:
URL: https://github.com/apache/kafka/pull/8938


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-652572009


   The existing `streams_upgrade_test.py` passes:
   http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-01--001.1593623647--vvcephei--kafka-10173-upgrde-smoke-system-test--be475ec5e/report.html
   
   The new `streams_application_upgrade_test.py` passes:
   http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-01--001.1593622020--vvcephei--kafka-10173-upgrde-smoke-system-test--131f98db8/report.html
   
   The full streams test suite is ongoing: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4007/console


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r448704043



##########
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]
+dev_version = [str(DEV_VERSION)]
+
+class StreamsUpgradeTest(Test):
+    """
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0 and
+    subsequently bumped in 2.0.0
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context)
+        self.topics = {
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 },
+        }
+
+    processed_msg = "processed [0-9]* records"
+    base_version_number = str(DEV_VERSION).split("-")[0]
+
+    def perform_broker_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling broker upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_version)
+            self.kafka.start_node(node)
+
+    @matrix(from_version=smoke_test_versions, to_version=dev_version, bounce_type=["full"])

Review comment:
       Ah, thanks for the clarification. Yes, I was thinking we would just add new matrices for both rolling-bounceable and downgradable versions. E.g., after the two pending releases, we'd also have:
   
   ```
   @matrix(from=[2.5.1, 2.6.0], to=[DEV_VERSION], bounce_type=[full, rolling])
   @matrix(from=[DEV_VERSION], to=[2.5.1, 2.6.0], bounce_type=[full, rolling])
   ```
   
   (just for illustrative purposes, we'd probably declare a variable to make it maintainable)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-651413272


   @vvcephei I have a couple meta questions about the missing tests: in `test_metadata_upgrade` we are testing upgrading from `metadata_2_versions` to `metadata_3_or_higher_versions`, where `metadata_3_or_higher_versions` includes up to trunk versions, would that provide some coverage?
   
   A second question is that, we should still keep the "downgrade" test from `trunk` to the specified old version, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-653256554


   Thanks, @guozhangwang ! 
   
   I'll start the process of backporting this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r448583306



##########
File path: tests/kafkatest/services/streams.py
##########
@@ -48,6 +48,15 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
         "streams_config": {
             "path": CONFIG_FILE,
             "collect_default": True},
+        "streams_config.1": {
+            "path": CONFIG_FILE + ".1",
+            "collect_default": True},
+        "streams_config.0-1": {
+            "path": CONFIG_FILE + ".0-1",
+            "collect_default": True},
+        "streams_config.1-1": {
+            "path": CONFIG_FILE + ".1-1",
+            "collect_default": True},

Review comment:
       Yep, but those are for other tests. The upgrade tests only roll one time. I wanted to try and avoid touching any other tests in this PR, so I only implemented config file rotation for the new test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-653256091


   The test failures were just the usual suspects:
   ```
   org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
   org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false/true]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r448586796



##########
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]
+dev_version = [str(DEV_VERSION)]
+
+class StreamsUpgradeTest(Test):
+    """
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0 and
+    subsequently bumped in 2.0.0
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context)
+        self.topics = {
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 },
+        }
+
+    processed_msg = "processed [0-9]* records"
+    base_version_number = str(DEV_VERSION).split("-")[0]
+
+    def perform_broker_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling broker upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_version)
+            self.kafka.start_node(node)
+
+    @matrix(from_version=smoke_test_versions, to_version=dev_version, bounce_type=["full"])

Review comment:
       When you say "how is this handled", do you mean in the system tests or in "real life"?
   
   I'm handling it in the system tests by not doing rolling bounces at all. If we do them, we'll see the test fail nondeterministically, when it just so happens that an old-version node tries to read a new-version record. Once we release 2.6.0, we'll be able to add a downgrade test from trunk to 2.6.0, and once we release 2.5.1, we can add a downgrade test from trunk to 2.5.1, and also one in the 2.6 branch from 2.6.x to 2.5.1.
   
   In real life, you would see some of the old-versioned threads get exceptions and crash. But it wouldn't be a _huge_ deal, since you can just continue with the rolling bounce until there are no more old-versioned instances, at which point everything is fine. There would be no data corruption or anything.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-652591278


   > There's a bit of a chicken-and-egg problem here, since we can't add the downgrade tests _yet_, but what I can do is to create the tickets to both fix the known downgrade issues and then add the relevant downgrade path to the test matrix. Similar to how I couldn't add an upgrade test from 2.1 to trunk, but I filed a ticket with a note that once the ticket is fixed, we should be able to add the missing test parameters.
   
   Sounds good to me!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r448581607



##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");

Review comment:
       Sounds good.
   
   ```suggestion
               System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close in time.");
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-652173502


   I had a chat with @ableegoldman earlier today.
   
   As a general strategy, maybe what we can do is aim to repair the non-downgradability where it exists. For example, when we release 2.5.1, then it will become possible to downgrade (with suppression) from 2.6.0 to 2.5.1 even though it's not possible to downgrade from 2.6.0 to 2.5.0.
   
   There's a bit of a chicken-and-egg problem here, since we can't add the downgrade tests _yet_, but what I can do is to create the tickets to both fix the known downgrade issues and then add the relevant downgrade path to the test matrix. Similar to how I couldn't add an upgrade test from 2.1 to trunk, but I filed a ticket with a note that once the ticket is fixed, we should be able to add the missing test parameters.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#discussion_r448653507



##########
File path: tests/kafkatest/tests/streams/streams_application_upgrade_test.py
##########
@@ -0,0 +1,297 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion
+
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]
+dev_version = [str(DEV_VERSION)]
+
+class StreamsUpgradeTest(Test):
+    """
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0 and
+    subsequently bumped in 2.0.0
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context)
+        self.topics = {
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 },
+        }
+
+    processed_msg = "processed [0-9]* records"
+    base_version_number = str(DEV_VERSION).split("-")[0]
+
+    def perform_broker_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling broker upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_version)
+            self.kafka.start_node(node)
+
+    @matrix(from_version=smoke_test_versions, to_version=dev_version, bounce_type=["full"])

Review comment:
       I was asking about in system tests :) More specifically I'm wondering if we could separate the upgrade path which are "rolling bouncible" from others that are not, e.g. as two separate tests in our python code base. And this can also be used as a cross-reference for our upgrade docs.
   
   Anyways, sine for trunk / 2.7 we would only be rolling bouncible from 2.6 and not from others this is not a big point. We can leave it to another follow-up PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org