You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/19 22:14:19 UTC

[GitHub] [kafka] vvcephei opened a new pull request #8905: KAFKA-10173: Directly use Arrays.equals for version comparison

vvcephei opened a new pull request #8905:
URL: https://github.com/apache/kafka/pull/8905


   Instead of relying on RecordHeader.equals, it is more reliable to use Arrays.equals
   on the header values.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446365992



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -258,34 +263,43 @@ private void logValue(final Bytes key, final BufferKey bufferKey, final BufferVa
         final int sizeOfBufferTime = Long.BYTES;
         final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
         buffer.putLong(bufferKey.time());
-
+        final byte[] array = buffer.array();
         ((RecordCollector.Supplier) context).recordCollector().send(
-                changelogTopic,
-                key,
-                buffer.array(),
-                V_2_CHANGELOG_HEADERS,
-                partition,
-                null,
-                KEY_SERIALIZER,
-                VALUE_SERIALIZER
+            changelogTopic,
+            key,
+            array,
+            CHANGELOG_HEADERS,
+            partition,
+            null,
+            KEY_SERIALIZER,
+            VALUE_SERIALIZER
         );
     }
 
     private void logTombstone(final Bytes key) {
         ((RecordCollector.Supplier) context).recordCollector().send(
-                changelogTopic,
-                key,
-                null,
-                null,
-                partition,
-                null,
-                KEY_SERIALIZER,
-                VALUE_SERIALIZER
+            changelogTopic,
+            key,
+            null,
+            null,

Review comment:
       I remember considering this when I added the first version header. The reason I didn't is that, since the initial version didn't have any headers, even if we change the tombstone format in the future, we'll always have to interpret a "no header, null value" record as being a "legacy format" tombstone, just like we have to interpret a "no header, non-null value" as being a "legacy format" data record.
   
   You can think of "no header" as indicating "version 0". Since we haven't changed the format of tombstones _yet_, there's no value in adding a "version 1" flag. We should just wait until we do need to make such a change (if ever).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8905: KAFKA-10173: Directly use Arrays.equals for version comparison

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r443184029



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -58,10 +59,12 @@
 public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
+    private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =

Review comment:
       my bad. The unused variable is V_1_CHANGELOG_HEADERS rather than V_1_CHANGELOG_HEADER_VALUE




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650355203


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446343541



##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -189,8 +192,8 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version):
         processor.stop()
         processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
-    @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
-    def test_simple_upgrade_downgrade(self, from_version, to_version):
+    @matrix(from_version=smoke_test_versions, to_version=dev_version)

Review comment:
       +1, I think this is a great find.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -361,26 +366,20 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                             contextualRecord.recordContext()
                         )
                     );
-                } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
-                    // in this case, the changelog value is a serialized BufferValue
+                } else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
+
+                    final DeserializationResult deserializationResult = duckTypeV2(record, key);

Review comment:
       Could you clarify which comment are you referring to? I did not see any comments for the "restoreBatch" method..

##########
File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {

Review comment:
       I'm assuming 22..25 client / drive code are all copy-pastes here so I skipped reviewing them. LMK if they aren't.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+final class TimeOrderedKeyValueBufferChangelogDeserializationHelper {
+    private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {}
+
+    static final class DeserializationResult {
+        private final long time;
+        private final Bytes key;
+        private final BufferValue bufferValue;
+
+        private DeserializationResult(final long time, final Bytes key, final BufferValue bufferValue) {
+            this.time = time;
+            this.key = key;
+            this.bufferValue = bufferValue;
+        }
+
+        long time() {
+            return time;
+        }
+
+        Bytes key() {
+            return key;
+        }
+
+        BufferValue bufferValue() {
+            return bufferValue;
+        }
+    }
+
+
+    static DeserializationResult duckTypeV2(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
+        DeserializationResult deserializationResult = null;
+        RuntimeException v2DeserializationException = null;
+        RuntimeException v3DeserializationException = null;
+        try {
+            deserializationResult = deserializeV2(record, key);
+        } catch (final RuntimeException e) {
+            v2DeserializationException = e;
+        }
+        // versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the
+        // V2 header, so we'll try duck-typing to see if this is decodable as V3
+        if (deserializationResult == null) {
+            try {
+                deserializationResult = deserializeV3(record, key);
+            } catch (final RuntimeException e) {
+                v3DeserializationException = e;
+            }
+        }
+
+        if (deserializationResult == null) {
+            // ok, it wasn't V3 either. Throw both exceptions:
+            final RuntimeException exception =
+                new RuntimeException("Couldn't deserialize record as v2 or v3: " + record,
+                                     v2DeserializationException);
+            exception.addSuppressed(v3DeserializationException);
+            throw exception;
+        }
+        return deserializationResult;
+    }
+
+    private static DeserializationResult deserializeV2(final ConsumerRecord<byte[], byte[]> record,

Review comment:
       Some docs, either here or directly inside `InMemoryTimeOrderedKeyValueBuffer.java` explaining the format difference would help a lot. You can see some examples like `object GroupMetadataManager`

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -258,34 +263,43 @@ private void logValue(final Bytes key, final BufferKey bufferKey, final BufferVa
         final int sizeOfBufferTime = Long.BYTES;
         final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
         buffer.putLong(bufferKey.time());
-
+        final byte[] array = buffer.array();
         ((RecordCollector.Supplier) context).recordCollector().send(
-                changelogTopic,
-                key,
-                buffer.array(),
-                V_2_CHANGELOG_HEADERS,
-                partition,
-                null,
-                KEY_SERIALIZER,
-                VALUE_SERIALIZER
+            changelogTopic,
+            key,
+            array,
+            CHANGELOG_HEADERS,
+            partition,
+            null,
+            KEY_SERIALIZER,
+            VALUE_SERIALIZER
         );
     }
 
     private void logTombstone(final Bytes key) {
         ((RecordCollector.Supplier) context).recordCollector().send(
-                changelogTopic,
-                key,
-                null,
-                null,
-                partition,
-                null,
-                KEY_SERIALIZER,
-                VALUE_SERIALIZER
+            changelogTopic,
+            key,
+            null,
+            null,

Review comment:
       I'm just thinking, maybe we should encode headers to tombstones too in case in the future we changed the semantics of tombstones?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446265559



##########
File path: build.gradle
##########
@@ -97,7 +97,7 @@ ext {
   buildVersionFileName = "kafka-version.properties"
 
   defaultMaxHeapSize = "2g"
-  defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"]
+  defaultJvmArgs = ["-Xss4m"]

Review comment:
       @ijuma , you'll probably want to know about this.
   
   I have no idea why, but one of the new tests in this PR was failing with:
   ```
       java.lang.OutOfMemoryError: Java heap space
           at org.apache.kafka.streams.kstream.internals.FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(FullChangeSerde.java:82)
           at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:90)
           at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:61)
           at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:369)
           at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$284/0x00000001002cb440.restoreBatch(Unknown Source)
           at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest.shouldRestoreV3FormatWithV2Header(TimeOrderedKeyValueBufferTest.java:742)
   ```
   
   I captured a flight recording and a heap dump on exit, but everything looked fine, and the heap was only a few megs at the time of the crash. I noticed first that if I just overrode all the jvm args, the test would pass, and through trial and error, I identified this one as the "cause".
   
   I get an OOMe every time with `-XX:+UseParallelGC` and I've never gotten it without the flag. WDYT about dropping it?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
##########
@@ -56,14 +55,13 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.CHANGELOG_HEADERS;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
-    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});

Review comment:
       imported the headers from the production code, so that it'll stay current.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
##########
@@ -104,10 +104,6 @@ public void shouldWorkWithRebalance() throws InterruptedException {
             clients.add(smokeTestClient);
             smokeTestClient.start(props);
 
-            while (!clients.get(clients.size() - 1).started()) {
-                Thread.sleep(100);
-            }
-

Review comment:
       Don't need this anymore because `start` blocks until it's "started" now.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {

Review comment:
       Found a missed condition, if the close timed out, there wouldn't be an exception, just a false return value.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");

Review comment:
       A new message we can look for to wait until the instance has completed joining the group.

##########
File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {

Review comment:
       All of these are just copy-pasted from the main module.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
##########
@@ -120,7 +120,7 @@ ByteBuffer serialize(final int endPadding) {
 
         if (oldValue == null) {
             buffer.putInt(NULL_VALUE_SENTINEL);
-        } else if (priorValue == oldValue) {
+        } else if (Arrays.equals(priorValue, oldValue)) {

Review comment:
       This was correct before, since we check equality and enforce identity in the constructor, but `Arrays.equals` is extremely cheap when the arrays are identical, so explicitly doing an identity check instead of equality was a micro-optimization.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -201,14 +204,29 @@ def test_simple_upgrade_downgrade(self, from_version, to_version):
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
         self.zk.start()
 
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics={

Review comment:
       A lot of these changes are part of adapting the test to the smoke test app.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
     private Properties getStreamsConfig(final Properties props) {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props) {

Review comment:
       inlined above.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
         streams.setUncaughtExceptionHandler((t, e) -> {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            e.printStackTrace(System.out);
             uncaughtException = true;
-            e.printStackTrace();
+            streams.close(Duration.ofSeconds(30));
         });
 
-        Exit.addShutdownHook("streams-shutdown-hook", () -> close());
+        addShutdownHook("streams-shutdown-hook", this::close);
 
-        thread = new Thread(() -> streams.start());
-        thread.start();
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
     }
 
     public void closeAsync() {
         streams.close(Duration.ZERO);
     }
 
     public void close() {
-        streams.close(Duration.ofSeconds(5));
-        // do not remove these printouts since they are needed for health scripts
-        if (!uncaughtException) {
+        final boolean wasClosed = streams.close(Duration.ofMinutes(1));
+
+        if (wasClosed && !uncaughtException) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
-        }
-        try {
-            thread.join();
-        } catch (final Exception ex) {
-            // do not remove these printouts since they are needed for health scripts
+        } else if (wasClosed) {
             System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
-            // ignore
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
         }
     }
 
     private Properties getStreamsConfig(final Properties props) {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
-        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");

Review comment:
       I moved all these to the system test `propFile()` definition.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
##########
@@ -19,17 +19,46 @@
 import org.apache.kafka.common.serialization.Serdes;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
+
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
 public class FullChangeSerdeTest {
     private final FullChangeSerde<String> serde = FullChangeSerde.wrap(Serdes.String());
 
+    /**
+     * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
+     * so that we can produce the legacy format to test that we can still deserialize it.
+     */
+    private static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {

Review comment:
       Moved from FullChangeSerde because it is only used in this test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -54,14 +56,17 @@
 import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3;
+import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2;
 
 public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
-    private static final RecordHeaders V_1_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
-    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
+    private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
+    private static final byte[] V_2_CHANGELOG_HEADER_VALUE = {(byte) 2};
+    private static final byte[] V_3_CHANGELOG_HEADER_VALUE = {(byte) 3};
+    static final RecordHeaders CHANGELOG_HEADERS =
+        new RecordHeaders(new Header[] {new RecordHeader("v", V_3_CHANGELOG_HEADER_VALUE)});

Review comment:
       We don't need to store the whole RecordHeaders for the old versions, just the actual version flag.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
##########
@@ -68,33 +68,6 @@ private FullChangeSerde(final Serde<T> inner) {
         return new Change<>(newValue, oldValue);
     }
 
-    /**
-     * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
-     * so that we can produce the legacy format to test that we can still deserialize it.
-     */
-    public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {

Review comment:
       Only used in the test now, so I moved it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -361,26 +366,20 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                             contextualRecord.recordContext()
                         )
                     );
-                } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
-                    // in this case, the changelog value is a serialized BufferValue
+                } else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
+
+                    final DeserializationResult deserializationResult = duckTypeV2(record, key);

Review comment:
       See the comment on this method for why we need to duck-type version 2. I pulled these deserializations into a helper class because all the extra branches pushed our cyclomatic complexity over the limit.
   
   But I kept the first two branches here because they aren't pure functions. They perform a lookup in the buffer itself as part of converting the old format.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
##########
@@ -372,12 +370,14 @@ public void shouldRestoreOldFormat() {
 
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
 
-        final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
+        // These serialized formats were captured by running version 2.1 code.
+        // They verify that an upgrade from 2.1 will work.
+        // Do not change them.
+        final String toDeleteBinaryValue = "0000000000000000FFFFFFFF00000006646F6F6D6564";
+        final String asdfBinaryValue = "0000000000000002FFFFFFFF0000000471776572";
+        final String zxcvBinaryValue1 = "00000000000000010000000870726576696F757300000005656F34696D";
+        final String zxcvBinaryValue2 = "000000000000000100000005656F34696D000000046E657874";

Review comment:
       This was one of my major findings in KAFKA-10173. Because the test was serializing the "old versions" using code shared with the current logic, we could not detect when we accidentally changed the current serialization logic without bumping the version number.
   
   By instead testing against fixed pre-serialized data, we should be a lot safer.
   
   I took inspiration from the way that Karsten reported the observed serialized data in the bug report. Hex-encoding the binary data makes the tests more readable than a long array of byte literals.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -481,8 +480,7 @@ public void put(final long time,
         final BufferValue buffered = getBuffered(serializedKey);
         final byte[] serializedPriorValue;
         if (buffered == null) {
-            final V priorValue = value.oldValue;
-            serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
+            serializedPriorValue = serialChange.oldValue;

Review comment:
       If you look a few lines up, you'll see that we just serialized the "old value", so we don't need to serialize it again here.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -37,6 +37,9 @@
 # can be replaced with metadata_2_versions
 backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(DEV_VERSION)]
+smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]

Review comment:
       See KAFKA-10203 for why I couldn't go past 2.2

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -361,26 +366,20 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                             contextualRecord.recordContext()
                         )
                     );
-                } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
-                    // in this case, the changelog value is a serialized BufferValue
+                } else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
+
+                    final DeserializationResult deserializationResult = duckTypeV2(record, key);
+                    cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
+
+                } else if (Arrays.equals(versionHeader.value(), V_3_CHANGELOG_HEADER_VALUE)) {

Review comment:
       This is the version bump we should have done in 2.4.0. I'll backport this fix to the 2.4 branch.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {

Review comment:
       I inlined these utilities to make this class more "portable". I.e., so that we can copy-paste it into the upgrade-test modules without dragging in a bunch of extra dependencies.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##########
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
     private final String name;
 
-    private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
-    private boolean started;
-    private boolean closed;
+    private volatile boolean closed;
 
-    public SmokeTestClient(final String name) {
-        super();
-        this.name = name;
+    private static void addShutdownHook(final String name, final Runnable runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
     }
 
-    public boolean started() {
-        return started;
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
     }
 
     public boolean closed() {
         return closed;
     }
 
     public void start(final Properties streamsProperties) {
-        streams = createKafkaStreams(streamsProperties);
+        final Topology build = getTopology();

Review comment:
       This was another bug I happened to notice while scrutinizing this system test. `createKafkaStreams` was registering a state listener and exception handler. But the next line here was overriding the exception handler, so the one registered in `createKafkaStreams` was getting ignored. I noticed it because I registered a state listener here, which also caused the one registered in `createKafkaStreams` to get ignored.
   
   Inlining solves this problem, and since `createKafkaStreams` had only one usage, it was needless anyway.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -189,8 +192,8 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version):
         processor.stop()
         processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
-    @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
-    def test_simple_upgrade_downgrade(self, from_version, to_version):
+    @matrix(from_version=smoke_test_versions, to_version=dev_version)

Review comment:
       We were previously not testing 2.0+ _at all_. After rewriting this as a smoke test, it only applies to 2.2+. I also figured it makes more sense just to test upgrades to the current branch, rather than testing cross-upgrades between every pair of versions.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##########
@@ -75,7 +75,7 @@ public void process(final Object key, final Object value) {
 
                     @Override
                     public void close() {
-                        System.out.printf("Close processor for task %s", context().taskId());
+                        System.out.printf("Close processor for task %s%n", context().taskId());

Review comment:
       Oops! I noticed the lack of a newline in the output. It didn't matter for the tests because the greps aren't bounded by line.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -305,23 +305,62 @@ def start_node(self, node):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):

Review comment:
       Adding a param so that we can get away with just one broker in the upgrade test.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -305,23 +305,62 @@ def start_node(self, node):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
                                                           "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
         self.NUM_THREADS = num_threads
         self.PROCESSING_GUARANTEE = processing_guarantee
+        self.KAFKA_STREAMS_VERSION = ""
+        self.UPGRADE_FROM = None
+        self.REPLICATION_FACTOR = replication_factor
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from

Review comment:
       Added for the upgrade test.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -305,23 +305,62 @@ def start_node(self, node):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
                                                           "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
         self.NUM_THREADS = num_threads
         self.PROCESSING_GUARANTEE = processing_guarantee
+        self.KAFKA_STREAMS_VERSION = ""
+        self.UPGRADE_FROM = None
+        self.REPLICATION_FACTOR = replication_factor
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from
 
     def prop_file(self):
         properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
                       streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
                       streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
-                      streams_property.NUM_THREADS: self.NUM_THREADS}
+                      streams_property.NUM_THREADS: self.NUM_THREADS,
+                      "replication.factor": self.REPLICATION_FACTOR,
+                      "num.standby.replicas": 2,
+                      "buffered.records.per.partition": 100,
+                      "commit.interval.ms": 1000,
+                      "auto.offset.reset": "earliest",
+                      "acks": "all"}

Review comment:
       Moved from the java code so that all the configs can be defined together.

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -349,56 +370,42 @@ def get_version_string(self, version):
     def start_all_nodes_with(self, version):

Review comment:
       I refactored this method to start all the nodes concurrently, rather than one at a time. We still do a rolling upgrade, but there's no need to do a rolling startup.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8905: KAFKA-10173: Directly use Arrays.equals for version comparison

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r443180740



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -58,10 +59,12 @@
 public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
+    private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =

Review comment:
       I saw it is used in line 342.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8905: KAFKA-10173: Directly use Arrays.equals for version comparison

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r443137719



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -58,10 +59,12 @@
 public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
+    private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =

Review comment:
       my IDEA says this variable is never used. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446409619



##########
File path: build.gradle
##########
@@ -97,7 +97,7 @@ ext {
   buildVersionFileName = "kafka-version.properties"
 
   defaultMaxHeapSize = "2g"
-  defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"]
+  defaultJvmArgs = ["-Xss4m"]

Review comment:
       Aha! I figured it out. There actually was a bug in the test. While duck-typing, the code was trying to allocate an array of 1.8GB. It's funny that disabling this flag made this test pass on java 11 and 14. Maybe the flag partitions the heap on those versions or something, so the test didn't actually have the full 2GB available. Anyway, I'm about to push a fix and put the flag back.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650483907


   backported to 2.6, 2.5, and 2.4. I ran the streams and client tests each time, as well as systemTestLibs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650354464


   test this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650413094


   I will follow up shortly to extract the system tests to a separate PR, since we're having trouble running the tests at all right now, and we wouldn't know if they are even more broken.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650387794


   Ah, that heap space thing was legit. Fix coming...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8905:
URL: https://github.com/apache/kafka/pull/8905


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650412771


   Hey @guozhangwang , you might want to take a look at that last fix. The duck-typing code was producing an OOME some times, when it would just interpret a random integer out of the buffer as a "size" (integer) and blindly allocate an array of that size.
   
   I added a Util (with tests) that has a guard to prevent this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446363798



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -361,26 +366,20 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                             contextualRecord.recordContext()
                         )
                     );
-                } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
-                    // in this case, the changelog value is a serialized BufferValue
+                } else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
+
+                    final DeserializationResult deserializationResult = duckTypeV2(record, key);

Review comment:
       Sorry, the comments in `duckTypeV2`.
   
   Basically, because we released three versions that would write data in the "v3" format, but with the "v2" flag, when we see the v2 flag, the data might be in v2 format or v3 format. The only way to tell is to just try to deserialize it in v2 format, and if we get an exception, then to try with v3 format.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446461812



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -258,34 +263,43 @@ private void logValue(final Bytes key, final BufferKey bufferKey, final BufferVa
         final int sizeOfBufferTime = Long.BYTES;
         final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
         buffer.putLong(bufferKey.time());
-
+        final byte[] array = buffer.array();
         ((RecordCollector.Supplier) context).recordCollector().send(
-                changelogTopic,
-                key,
-                buffer.array(),
-                V_2_CHANGELOG_HEADERS,
-                partition,
-                null,
-                KEY_SERIALIZER,
-                VALUE_SERIALIZER
+            changelogTopic,
+            key,
+            array,
+            CHANGELOG_HEADERS,
+            partition,
+            null,
+            KEY_SERIALIZER,
+            VALUE_SERIALIZER
         );
     }
 
     private void logTombstone(final Bytes key) {
         ((RecordCollector.Supplier) context).recordCollector().send(
-                changelogTopic,
-                key,
-                null,
-                null,
-                partition,
-                null,
-                KEY_SERIALIZER,
-                VALUE_SERIALIZER
+            changelogTopic,
+            key,
+            null,
+            null,

Review comment:
       SG




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446364364



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+final class TimeOrderedKeyValueBufferChangelogDeserializationHelper {
+    private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {}
+
+    static final class DeserializationResult {
+        private final long time;
+        private final Bytes key;
+        private final BufferValue bufferValue;
+
+        private DeserializationResult(final long time, final Bytes key, final BufferValue bufferValue) {
+            this.time = time;
+            this.key = key;
+            this.bufferValue = bufferValue;
+        }
+
+        long time() {
+            return time;
+        }
+
+        Bytes key() {
+            return key;
+        }
+
+        BufferValue bufferValue() {
+            return bufferValue;
+        }
+    }
+
+
+    static DeserializationResult duckTypeV2(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
+        DeserializationResult deserializationResult = null;
+        RuntimeException v2DeserializationException = null;
+        RuntimeException v3DeserializationException = null;
+        try {
+            deserializationResult = deserializeV2(record, key);
+        } catch (final RuntimeException e) {
+            v2DeserializationException = e;
+        }
+        // versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the
+        // V2 header, so we'll try duck-typing to see if this is decodable as V3
+        if (deserializationResult == null) {
+            try {
+                deserializationResult = deserializeV3(record, key);
+            } catch (final RuntimeException e) {
+                v3DeserializationException = e;
+            }
+        }
+
+        if (deserializationResult == null) {
+            // ok, it wasn't V3 either. Throw both exceptions:
+            final RuntimeException exception =
+                new RuntimeException("Couldn't deserialize record as v2 or v3: " + record,
+                                     v2DeserializationException);
+            exception.addSuppressed(v3DeserializationException);
+            throw exception;
+        }
+        return deserializationResult;
+    }
+
+    private static DeserializationResult deserializeV2(final ConsumerRecord<byte[], byte[]> record,

Review comment:
       sure thing!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Directly use Arrays.equals for version comparison

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r443253760



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -58,10 +59,12 @@
 public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
+    private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =

Review comment:
       Ah, right. My mistake. Thanks for pointing it out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8905: KAFKA-10173: Directly use Arrays.equals for version comparison

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r443137945



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -299,16 +311,6 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                         minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time();
                     }
                 }
-
-                if (record.partition() != partition) {
-                    throw new IllegalStateException(
-                        String.format(
-                            "record partition [%d] is being restored by the wrong suppress partition [%d]",
-                            record.partition(),
-                            partition
-                        )
-                    );
-                }
             } else {
                 if (record.headers().lastHeader("v") == null) {

Review comment:
       nit:
   We seek the last header many times. Could we reuse the return value?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Directly use Arrays.equals for version comparison

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r443062971



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -339,7 +341,7 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                             recordContext
                         )
                     );
-                } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
+                } else if (Arrays.equals(record.headers().lastHeader("v").value(), V_1_CHANGELOG_HEADER_VALUE)) {

Review comment:
       This is the fix (although it was probably fine before). The implementation of Header.equals is not specified by any contract, so it's safer to perform a direct comparison on the header values. Just as before, I'm comparing byte arrays to avoid deserializing the value.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -286,6 +289,15 @@ private void logTombstone(final Bytes key) {
 
     private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) {
         for (final ConsumerRecord<byte[], byte[]> record : batch) {
+            if (record.partition() != partition) {
+                throw new IllegalStateException(
+                    String.format(
+                        "record partition [%d] is being restored by the wrong suppress partition [%d]",
+                        record.partition(),
+                        partition
+                    )
+                );
+            }

Review comment:
       On the side, I realized we can consolidate this check and perform it first, rather than after we're already written bad data into the buffer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Directly use Arrays.equals for version comparison

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r443253796



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -299,16 +311,6 @@ private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch
                         minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time();
                     }
                 }
-
-                if (record.partition() != partition) {
-                    throw new IllegalStateException(
-                        String.format(
-                            "record partition [%d] is being restored by the wrong suppress partition [%d]",
-                            record.partition(),
-                            partition
-                        )
-                    );
-                }
             } else {
                 if (record.headers().lastHeader("v") == null) {

Review comment:
       Sure!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650462969


   Ok, @guozhangwang , This is my "final" iteration. I pulled the system tests out, and I'll follow up with another PR later. This PR should be sufficient for the basic purpose, thanks to the new "binary" compatibility unit tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-649844692


   I'm still cleaning up this PR. I'll call for reviews when it's ready.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446364196



##########
File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {

Review comment:
       That's correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650354756


   Hmm. Still saw the OOME in https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3134/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650476597


   Failures were unrelated:
   
   ```
   kafka.api.PlaintextConsumerTest > testLowMaxFetchSizeForRequestAndPartition FAILED
       org.scalatest.exceptions.TestFailedException: Timed out before consuming expected 2700 records. The number consumed was 1077.
           at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
           at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
           at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
           at org.scalatest.Assertions.fail(Assertions.scala:1091)
           at org.scalatest.Assertions.fail$(Assertions.scala:1087)
           at org.scalatest.Assertions$.fail(Assertions.scala:1389)
           at kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:158)
           at kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition(PlaintextConsumerTest.scala:804)
   ```
   
   ```
   org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testReplication FAILED
       java.lang.RuntimeException: Could not find enough records. found 0, expected 100
           at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435)
           at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:217)
   ```
   
   ```
   kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
   ```
   
   ```
   org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
   ```
   
   ```
   org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org