You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/26 16:26:09 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446265559



##########
File path: build.gradle
##########
@@ -97,7 +97,7 @@ ext {
   buildVersionFileName = "kafka-version.properties"
 
   defaultMaxHeapSize = "2g"
-  defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"]
+  defaultJvmArgs = ["-Xss4m"]

Review comment:
       @ijuma , you'll probably want to know about this.
   
   I have no idea why, but one of the new tests in this PR was failing with:
   ```
       java.lang.OutOfMemoryError: Java heap space
           at org.apache.kafka.streams.kstream.internals.FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(FullChangeSerde.java:82)
           at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:90)
           at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:61)
           at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:369)
           at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$284/0x00000001002cb440.restoreBatch(Unknown Source)
           at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest.shouldRestoreV3FormatWithV2Header(TimeOrderedKeyValueBufferTest.java:742)
   ```
   
   I captured a flight recording and a heap dump on exit, but everything looked fine, and the heap was only a few megs at the time of the crash. I noticed first that if I just overrode all the jvm args, the test would pass, and through trial and error, I identified this one as the "cause".
   
   I get an OOMe every time with `-XX:+UseParallelGC` and I've never gotten it without the flag. WDYT about dropping it?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
##########
@@ -56,14 +55,13 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.CHANGELOG_HEADERS;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
-    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});

Review comment:
       imported the headers from the production code, so that it'll stay current.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
##########
@@ -104,10 +104,6 @@ public void shouldWorkWithRebalance() throws InterruptedException {
             clients.add(smokeTestClient);
             smokeTestClient.start(props);
 
-            while (!clients.get(clients.size() - 1).started()) {
-                Thread.sleep(100);
-            }
-

Review comment:
       Don't need this anymore because `start` blocks until it's "started" now.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {

Review comment:
       Found a missed condition, if the close timed out, there wouldn't be an exception, just a false return value.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");

Review comment:
       A new message we can look for to wait until the instance has completed joining the group.

##########
File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {

Review comment:
       All of these are just copy-pasted from the main module.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
##########
@@ -120,7 +120,7 @@ ByteBuffer serialize(final int endPadding) {
 
         if (oldValue == null) {
             buffer.putInt(NULL_VALUE_SENTINEL);
-        } else if (priorValue == oldValue) {
+        } else if (Arrays.equals(priorValue, oldValue)) {

Review comment:
       This was correct before, since we check equality and enforce identity in the constructor, but `Arrays.equals` is extremely cheap when the arrays are identical, so explicitly doing an identity check instead of equality was a micro-optimization.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -201,14 +204,29 @@ def test_simple_upgrade_downgrade(self, from_version, to_version):
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
         self.zk.start()
 
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics={

Review comment:
       A lot of these changes are part of adapting the test to the smoke test app.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
     private Properties getStreamsConfig(final Properties props) {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props) {

Review comment:
       inlined above.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
     private Properties getStreamsConfig(final Properties props) {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");

Review comment:
       I moved all these to the system test `propFile()` definition.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
##########
@@ -19,17 +19,46 @@
 import org.apache.kafka.common.serialization.Serdes;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
+
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
 public class FullChangeSerdeTest {
     private final FullChangeSerde<String> serde = FullChangeSerde.wrap(Serdes.String());
 
+    /**
+     * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
+     * so that we can produce the legacy format to test that we can still deserialize it.
+     */
+    private static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {

Review comment:
       Moved from FullChangeSerde because it is only used in this test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -54,14 +56,17 @@
 import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2;
 
 public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
-    private static final RecordHeaders V_1_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
-    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
+    private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
+    private static final byte[] V_2_CHANGELOG_HEADER_VALUE = {(byte) 2};
+    private static final byte[] V_3_CHANGELOG_HEADER_VALUE = {(byte) 3};
+    static final RecordHeaders CHANGELOG_HEADERS =
+        new RecordHeaders(new Header[] {new RecordHeader("v", V_3_CHANGELOG_HEADER_VALUE)});

Review comment:
       We don't need to store the whole RecordHeaders for the old versions, just the actual version flag.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
##########
@@ -68,33 +68,6 @@ private FullChangeSerde(final Serde<T> inner) {
         return new Change<>(newValue, oldValue);
     }
 
-    /**
-     * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
-     * so that we can produce the legacy format to test that we can still deserialize it.
-     */
-    public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {

Review comment:
       Only used in the test now, so I moved it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -361,26 +366,20 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                             contextualRecord.recordContext()
                         )
                     );
-                } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
-                    // in this case, the changelog value is a serialized BufferValue
+                } else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
+
+                    final DeserializationResult deserializationResult = duckTypeV2(record, key);

Review comment:
       See the comment on this method for why we need to duck-type version 2. I pulled these deserializations into a helper class because all the extra branches pushed our cyclomatic complexity over the limit.
   
   But I kept the first two branches here because they aren't pure functions. They perform a lookup in the buffer itself as part of converting the old format.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
##########
@@ -372,12 +370,14 @@ public void shouldRestoreOldFormat() {
 
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
-        final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
+        // These serialized formats were captured by running version 2.1 code.
+        // They verify that an upgrade from 2.1 will work.
+        // Do not change them.
+        final String toDeleteBinaryValue = "0000000000000000FFFFFFFF00000006646F6F6D6564";
+        final String asdfBinaryValue = "0000000000000002FFFFFFFF0000000471776572";
+        final String zxcvBinaryValue1 = "00000000000000010000000870726576696F757300000005656F34696D";
+        final String zxcvBinaryValue2 = "000000000000000100000005656F34696D000000046E657874";

Review comment:
       This was one of my major findings in KAFKA-10173. Because the test was serializing the "old versions" using code shared with the current logic, we could not detect when we accidentally changed the current serialization logic without bumping the version number.
   
   By instead testing against fixed pre-serialized data, we should be a lot safer.
   
   I took inspiration from the way that Karsten reported the observed serialized data in the bug report. Hex-encoding the binary data makes the tests more readable than a long array of byte literals.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -481,8 +480,7 @@ public void put(final long time,
         final BufferValue buffered = getBuffered(serializedKey);
         final byte[] serializedPriorValue;
         if (buffered == null) {
-            final V priorValue = value.oldValue;
-            serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
+            serializedPriorValue = serialChange.oldValue;

Review comment:
       If you look a few lines up, you'll see that we just serialized the "old value", so we don't need to serialize it again here.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -37,6 +37,9 @@
 # can be replaced with metadata_2_versions
 backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(DEV_VERSION)]
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]

Review comment:
       See KAFKA-10203 for why I couldn't go past 2.2

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -361,26 +366,20 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                             contextualRecord.recordContext()
                         )
                     );
-                } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
-                    // in this case, the changelog value is a serialized BufferValue
+                } else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
+
+                    final DeserializationResult deserializationResult = duckTypeV2(record, key);
+                    cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
+
+                } else if (Arrays.equals(versionHeader.value(), V_3_CHANGELOG_HEADER_VALUE)) {

Review comment:
       This is the version bump we should have done in 2.4.0. I'll backport this fix to the 2.4 branch.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {

Review comment:
       I inlined these utilities to make this class more "portable". I.e., so that we can copy-paste it into the upgrade-test modules without dragging in a bunch of extra dependencies.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();

Review comment:
       This was another bug I happened to notice while scrutinizing this system test. `createKafkaStreams` was registering a state listener and exception handler. But the next line here was overriding the exception handler, so the one registered in `createKafkaStreams` was getting ignored. I noticed it because I registered a state listener here, which also caused the one registered in `createKafkaStreams` to get ignored.
   
   Inlining solves this problem, and since `createKafkaStreams` had only one usage, it was needless anyway.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -189,8 +192,8 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version):
         processor.stop()
         processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
-    @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
-    def test_simple_upgrade_downgrade(self, from_version, to_version):
+    @matrix(from_version=smoke_test_versions, to_version=dev_version)

Review comment:
       We were previously not testing 2.0+ _at all_. After rewriting this as a smoke test, it only applies to 2.2+. I also figured it makes more sense just to test upgrades to the current branch, rather than testing cross-upgrades between every pair of versions.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##########
@@ -75,7 +75,7 @@ public void process(final Object key, final Object value) {
 
                     @Override
                     public void close() {
-                        System.out.printf("Close processor for task %s", context().taskId());
+                        System.out.printf("Close processor for task %s%n", context().taskId());

Review comment:
       Oops! I noticed the lack of a newline in the output. It didn't matter for the tests because the greps aren't bounded by line.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -305,23 +305,62 @@ def start_node(self, node):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):

Review comment:
       Adding a param so that we can get away with just one broker in the upgrade test.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -305,23 +305,62 @@ def start_node(self, node):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
                                                           "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
         self.NUM_THREADS = num_threads
         self.PROCESSING_GUARANTEE = processing_guarantee
+        self.KAFKA_STREAMS_VERSION = ""
+        self.UPGRADE_FROM = None
+        self.REPLICATION_FACTOR = replication_factor
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from

Review comment:
       Added for the upgrade test.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -305,23 +305,62 @@ def start_node(self, node):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
                                                           "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
         self.NUM_THREADS = num_threads
         self.PROCESSING_GUARANTEE = processing_guarantee
+        self.KAFKA_STREAMS_VERSION = ""
+        self.UPGRADE_FROM = None
+        self.REPLICATION_FACTOR = replication_factor
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from
 
     def prop_file(self):
         properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
                       streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
                       streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
-                      streams_property.NUM_THREADS: self.NUM_THREADS}
+                      streams_property.NUM_THREADS: self.NUM_THREADS,
+                      "replication.factor": self.REPLICATION_FACTOR,
+                      "num.standby.replicas": 2,
+                      "buffered.records.per.partition": 100,
+                      "commit.interval.ms": 1000,
+                      "auto.offset.reset": "earliest",
+                      "acks": "all"}

Review comment:
       Moved from the java code so that all the configs can be defined together.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -349,56 +370,42 @@ def get_version_string(self, version):
     def start_all_nodes_with(self, version):

Review comment:
       I refactored this method to start all the nodes concurrently, rather than one at a time. We still do a rolling upgrade, but there's no need to do a rolling startup.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org