You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/04/15 18:29:02 UTC

[kafka] branch trunk updated: KAFKA-13769: Explicitly route FK join results to correct partitions (#11945)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new adf5cc5371 KAFKA-13769: Explicitly route FK join results to correct partitions (#11945)
adf5cc5371 is described below

commit adf5cc5371db6253e070b11dc78dedc99c8065f9
Author: Aleksandr Sorokoumov <91...@users.noreply.github.com>
AuthorDate: Fri Apr 15 20:28:43 2022 +0200

    KAFKA-13769: Explicitly route FK join results to correct partitions (#11945)
    
    Prior to this commit FK response sink routed FK results to
    SubscriptionResolverJoinProcessorSupplier using the primary key.
    
    There are cases, where this behavior is incorrect. For example,
    if KTable key serde differs from the data source serde which might
    happen without a key changing operation.
    
    Instead of determining the resolver partition by serializing the PK
    this patch includes target partition in SubscriptionWrapper payloads.
    Default FK response-sink partitioner extracts the correct partition
    from the value and routes the message accordingly.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 checkstyle/suppressions.xml                        |   2 +-
 docs/streams/upgrade-guide.html                    |   6 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  65 ++++++-
 .../streams/kstream/internals/KTableImpl.java      |   2 +-
 .../ForeignJoinSubscriptionProcessorSupplier.java  |   5 +-
 ...reignJoinSubscriptionSendProcessorSupplier.java |  28 ++-
 .../SubscriptionJoinForeignProcessorSupplier.java  |  14 +-
 .../SubscriptionResponseWrapper.java               |  23 ++-
 .../SubscriptionResponseWrapperSerde.java          |   5 +-
 .../foreignkeyjoin/SubscriptionWrapper.java        |  19 +-
 .../foreignkeyjoin/SubscriptionWrapperSerde.java   | 107 +++++++++--
 .../assignment/AssignorConfiguration.java          |  22 +++
 ...scriptionResolverJoinProcessorSupplierTest.java |  12 +-
 .../SubscriptionResponseWrapperSerdeTest.java      |  16 +-
 .../SubscriptionWrapperSerdeTest.java              | 199 +++++++++++++++++++--
 15 files changed, 458 insertions(+), 67 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c1d3cb078d..2a2f7fb0f5 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -182,7 +182,7 @@
               files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup).java"/>
+              files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup|SubscriptionWrapperSerde|AssignorConfiguration).java"/>
 
     <suppress checks="StaticVariableName"
               files="StreamsMetricsImpl.java"/>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index febfc65ad5..7b6075d6ad 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -34,9 +34,9 @@
     </div>
 
     <p>
-        Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 2.3 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
-        (possible values are <code>"0.10.0" - "2.3"</code>) and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager
-        rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to
+        Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.2 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
+        (possible values are <code>"0.10.0" - "3.2"</code>) and during the second you remove it. This is required to safely handle 2 changes. The first is introduction of the new cooperative rebalancing protocol of the embedded consumer. The second is a change in foreign-key join serialization format.
+        Note that you will remain using the old eager rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to
         <a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>:
     </p>
     <ul>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 740f7af5da..2f1134b43e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -301,6 +301,54 @@ public class StreamsConfig extends AbstractConfig {
     @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_23 = "2.3";
 
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.4.x}.
+     */
+    @SuppressWarnings("WeakerAccess")
+    public static final String UPGRADE_FROM_24 = "2.4";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.5.x}.
+     */
+    @SuppressWarnings("WeakerAccess")
+    public static final String UPGRADE_FROM_25 = "2.5";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.6.x}.
+     */
+    @SuppressWarnings("WeakerAccess")
+    public static final String UPGRADE_FROM_26 = "2.6";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.7.x}.
+     */
+    @SuppressWarnings("WeakerAccess")
+    public static final String UPGRADE_FROM_27 = "2.7";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.8.x}.
+     */
+    @SuppressWarnings("WeakerAccess")
+    public static final String UPGRADE_FROM_28 = "2.8";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.0.x}.
+     */
+    @SuppressWarnings("WeakerAccess")
+    public static final String UPGRADE_FROM_30 = "3.0";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.1.x}.
+     */
+    @SuppressWarnings("WeakerAccess")
+    public static final String UPGRADE_FROM_31 = "3.1";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.2.x}.
+     */
+    @SuppressWarnings("WeakerAccess")
+    public static final String UPGRADE_FROM_32 = "3.2";
+
     /**
      * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
      */
@@ -632,11 +680,14 @@ public class StreamsConfig extends AbstractConfig {
     public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
     private static final String UPGRADE_FROM_DOC = "Allows upgrading in a backward compatible way. " +
         "This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. " +
-        "When upgrading from 2.4 to a newer version it is not required to specify this config. Default is `null`. " +
+        "When upgrading from 3.3 to a newer version it is not required to specify this config. Default is `null`. " +
         "Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" +
         UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" +
         UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" +
-        UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\" (for upgrading from the corresponding old version).";
+        UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + UPGRADE_FROM_24 + "\", \"" +
+        UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + UPGRADE_FROM_27 + "\", \"" +
+        UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + UPGRADE_FROM_31 + "\", \"" +
+        UPGRADE_FROM_32 + "\" (for upgrading from the corresponding old version).";
 
     /** {@code windowstore.changelog.additional.retention.ms} */
     @SuppressWarnings("WeakerAccess")
@@ -960,7 +1011,15 @@ public class StreamsConfig extends AbstractConfig {
                        UPGRADE_FROM_20,
                        UPGRADE_FROM_21,
                        UPGRADE_FROM_22,
-                       UPGRADE_FROM_23),
+                       UPGRADE_FROM_23,
+                       UPGRADE_FROM_24,
+                       UPGRADE_FROM_25,
+                       UPGRADE_FROM_26,
+                       UPGRADE_FROM_27,
+                       UPGRADE_FROM_28,
+                       UPGRADE_FROM_30,
+                       UPGRADE_FROM_31,
+                       UPGRADE_FROM_32),
                     Importance.LOW,
                     UPGRADE_FROM_DOC)
             .define(WINDOWED_INNER_CLASS_SERDE,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3c88732063..2abe7f5386 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -1197,7 +1197,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
 
         final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> foreignResponseSinkPartitioner =
             tableJoinedInternal.partitioner() == null
-                ? null
+                ? (topic, key, subscriptionResponseWrapper, numPartitions) -> subscriptionResponseWrapper.getPrimaryPartition()
                 : (topic, key, val, numPartitions) ->
                     tableJoinedInternal.partitioner().partition(topic, key, null, numPartitions);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
index 63afb71a3c..55e40fce64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java
@@ -108,7 +108,10 @@ public class ForeignJoinSubscriptionProcessorSupplier<K, KO, VO> implements
                         final CombinedKey<KO, K> combinedKey = keySchema.fromBytes(next.key);
                         context().forward(
                             record.withKey(combinedKey.getPrimaryKey())
-                                .withValue(new SubscriptionResponseWrapper<>(next.value.value().getHash(), record.value().newValue))
+                                .withValue(new SubscriptionResponseWrapper<>(
+                                    next.value.value().getHash(),
+                                    record.value().newValue,
+                                    next.value.value().getPrimaryPartition()))
                         );
                     }
                 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
index 9394487c86..0efe4da2bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
@@ -103,6 +103,7 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
                 null :
                 Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue));
 
+            final int partition = context().recordMetadata().get().partition();
             if (record.value().oldValue != null) {
                 final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue);
                 if (oldForeignKey == null) {
@@ -149,19 +150,34 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
                         //Delete it from the oldKey's state store
                         context().forward(
                             record.withKey(oldForeignKey)
-                                .withValue(new SubscriptionWrapper<>(currentHash, DELETE_KEY_NO_PROPAGATE, record.key())));
+                                .withValue(new SubscriptionWrapper<>(
+                                    currentHash,
+                                    DELETE_KEY_NO_PROPAGATE,
+                                    record.key(),
+                                    partition
+                                )));
                         //Add to the newKey's state store. Additionally, propagate null if no FK is found there,
                         //since we must "unset" any output set by the previous FK-join. This is true for both INNER
                         //and LEFT join.
                     }
                     context().forward(
                         record.withKey(newForeignKey)
-                            .withValue(new SubscriptionWrapper<>(currentHash, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, record.key())));
+                            .withValue(new SubscriptionWrapper<>(
+                                currentHash,
+                                PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+                                record.key(),
+                                partition
+                            )));
                 } else {
                     //A simple propagatable delete. Delete from the state store and propagate the delete onwards.
                     context().forward(
                         record.withKey(oldForeignKey)
-                           .withValue(new SubscriptionWrapper<>(currentHash, DELETE_KEY_AND_PROPAGATE, record.key())));
+                           .withValue(new SubscriptionWrapper<>(
+                               currentHash,
+                               DELETE_KEY_AND_PROPAGATE,
+                               record.key(),
+                               partition
+                           )));
                 }
             } else if (record.value().newValue != null) {
                 //change.oldValue is null, which means it was deleted at least once before, or it is brand new.
@@ -193,7 +209,11 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
                 } else {
                     context().forward(
                         record.withKey(newForeignKey)
-                            .withValue(new SubscriptionWrapper<>(currentHash, instruction, record.key())));
+                            .withValue(new SubscriptionWrapper<>(
+                                currentHash,
+                                instruction,
+                                record.key(),
+                                partition)));
                 }
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
index 4820beafe1..fea8e73867 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java
@@ -88,7 +88,11 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
                     case DELETE_KEY_AND_PROPAGATE:
                         context().forward(
                             record.withKey(record.key().getPrimaryKey())
-                                .withValue(new SubscriptionResponseWrapper<VO>(value.getHash(), null))
+                                .withValue(new SubscriptionResponseWrapper<VO>(
+                                    value.getHash(),
+                                    null,
+                                    value.getPrimaryPartition()
+                                ))
                                 .withTimestamp(resultTimestamp)
                         );
                         break;
@@ -100,7 +104,7 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
 
                         context().forward(
                             record.withKey(record.key().getPrimaryKey())
-                                .withValue(new SubscriptionResponseWrapper<>(value.getHash(), valueToSend))
+                                .withValue(new SubscriptionResponseWrapper<>(value.getHash(), valueToSend, value.getPrimaryPartition()))
                                 .withTimestamp(resultTimestamp)
                         );
                         break;
@@ -108,7 +112,11 @@ public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
                         if (foreignValueAndTime != null) {
                             context().forward(
                                 record.withKey(record.key().getPrimaryKey())
-                                   .withValue(new SubscriptionResponseWrapper<>(value.getHash(), foreignValueAndTime.value()))
+                                   .withValue(new SubscriptionResponseWrapper<>(
+                                       value.getHash(),
+                                       foreignValueAndTime.value(),
+                                       value.getPrimaryPartition()
+                                   ))
                                    .withTimestamp(resultTimestamp)
                             );
                         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
index 9c79e46821..ef9a3a055b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
@@ -21,22 +21,30 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import java.util.Arrays;
 
 public class SubscriptionResponseWrapper<FV> {
-    final static byte CURRENT_VERSION = 0x00;
+    final static byte CURRENT_VERSION = 0;
+    // v0 fields:
     private final long[] originalValueHash;
     private final FV foreignValue;
     private final byte version;
+    // non-serializing fields
+    private final Integer primaryPartition;
 
-    public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue) {
-        this(originalValueHash, foreignValue, CURRENT_VERSION);
+    public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue, final Integer primaryPartition) {
+        this(originalValueHash, foreignValue, CURRENT_VERSION, primaryPartition);
     }
 
-    public SubscriptionResponseWrapper(final long[] originalValueHash, final FV foreignValue, final byte version) {
-        if (version != CURRENT_VERSION) {
+    public SubscriptionResponseWrapper(
+        final long[] originalValueHash,
+        final FV foreignValue,
+        final byte version,
+        final Integer primaryPartition) {
+        if (version < 0 || version > CURRENT_VERSION) {
             throw new UnsupportedVersionException("SubscriptionWrapper does not support version " + version);
         }
         this.originalValueHash = originalValueHash;
         this.foreignValue = foreignValue;
         this.version = version;
+        this.primaryPartition = primaryPartition;
     }
 
     public long[] getOriginalValueHash() {
@@ -51,12 +59,17 @@ public class SubscriptionResponseWrapper<FV> {
         return version;
     }
 
+    public Integer getPrimaryPartition() {
+        return primaryPartition;
+    }
+
     @Override
     public String toString() {
         return "SubscriptionResponseWrapper{" +
             "version=" + version +
             ", foreignValue=" + foreignValue +
             ", originalValueHash=" + Arrays.toString(originalValueHash) +
+            ", primaryPartition=" + primaryPartition +
             '}';
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 8910ff80c3..12a14e7cc4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -91,7 +91,6 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
                 buf.put(serializedData);
             return buf.array();
         }
-
     }
 
     private static final class SubscriptionResponseWrapperDeserializer<V>
@@ -141,9 +140,7 @@ public class SubscriptionResponseWrapperSerde<V> implements Serde<SubscriptionRe
                 value = null;
             }
 
-            return new SubscriptionResponseWrapper<>(hash, value, version);
+            return new SubscriptionResponseWrapper<>(hash, value, version, null);
         }
-
     }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
index a757895aec..a75a419cd7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
@@ -23,12 +23,15 @@ import java.util.Objects;
 
 
 public class SubscriptionWrapper<K> {
-    static final byte CURRENT_VERSION = 0;
+    static final byte CURRENT_VERSION = 1;
 
+    // v0 fields:
     private final long[] hash;
     private final Instruction instruction;
     private final byte version;
     private final K primaryKey;
+    // v1 fields:
+    private final Integer primaryPartition;
 
     public enum Instruction {
         //Send nothing. Do not propagate.
@@ -65,14 +68,14 @@ public class SubscriptionWrapper<K> {
         }
     }
 
-    public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey) {
-        this(hash, instruction, primaryKey, CURRENT_VERSION);
+    public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final Integer primaryPartition) {
+        this(hash, instruction, primaryKey, CURRENT_VERSION, primaryPartition);
     }
 
-    public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final byte version) {
+    public SubscriptionWrapper(final long[] hash, final Instruction instruction, final K primaryKey, final byte version, final Integer primaryPartition) {
         Objects.requireNonNull(instruction, "instruction cannot be null. Required by downstream processor.");
         Objects.requireNonNull(primaryKey, "primaryKey cannot be null. Required by downstream processor.");
-        if (version != CURRENT_VERSION) {
+        if (version < 0 || version > CURRENT_VERSION) {
             throw new UnsupportedVersionException("SubscriptionWrapper does not support version " + version);
         }
 
@@ -80,6 +83,7 @@ public class SubscriptionWrapper<K> {
         this.hash = hash;
         this.primaryKey = primaryKey;
         this.version = version;
+        this.primaryPartition = primaryPartition;
     }
 
     public Instruction getInstruction() {
@@ -98,6 +102,10 @@ public class SubscriptionWrapper<K> {
         return version;
     }
 
+    public Integer getPrimaryPartition() {
+        return primaryPartition;
+    }
+
     @Override
     public String toString() {
         return "SubscriptionWrapper{" +
@@ -105,6 +113,7 @@ public class SubscriptionWrapper<K> {
             ", primaryKey=" + primaryKey +
             ", instruction=" + instruction +
             ", hash=" + Arrays.toString(hash) +
+            ", primaryPartition=" + primaryPartition +
             '}';
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index e71376216b..c04125495c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import java.util.Map;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
@@ -45,6 +47,7 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
         private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
         private String primaryKeySerializationPseudoTopic = null;
         private Serializer<K> primaryKeySerializer;
+        private boolean upgradeFromV0 = false;
 
         SubscriptionWrapperSerializer(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
                                       final Serializer<K> primaryKeySerializer) {
@@ -60,34 +63,85 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
             }
         }
 
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
+            this.upgradeFromV0 = upgradeFromV0(configs);
+        }
+
+        private static boolean upgradeFromV0(final Map<String, ?> configs) {
+            final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+            if (upgradeFrom == null) {
+                return false;
+            }
+
+            switch ((String) upgradeFrom) {
+                case StreamsConfig.UPGRADE_FROM_0100:
+                case StreamsConfig.UPGRADE_FROM_0101:
+                case StreamsConfig.UPGRADE_FROM_0102:
+                case StreamsConfig.UPGRADE_FROM_0110:
+                case StreamsConfig.UPGRADE_FROM_10:
+                case StreamsConfig.UPGRADE_FROM_11:
+                case StreamsConfig.UPGRADE_FROM_20:
+                case StreamsConfig.UPGRADE_FROM_21:
+                case StreamsConfig.UPGRADE_FROM_22:
+                case StreamsConfig.UPGRADE_FROM_23:
+                case StreamsConfig.UPGRADE_FROM_24:
+                case StreamsConfig.UPGRADE_FROM_25:
+                case StreamsConfig.UPGRADE_FROM_26:
+                case StreamsConfig.UPGRADE_FROM_27:
+                case StreamsConfig.UPGRADE_FROM_28:
+                case StreamsConfig.UPGRADE_FROM_30:
+                case StreamsConfig.UPGRADE_FROM_31:
+                case StreamsConfig.UPGRADE_FROM_32:
+                    return true;
+                default:
+                    return false;
+            }
+        }
+
         @Override
         public byte[] serialize(final String ignored, final SubscriptionWrapper<K> data) {
-            //{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
+            //{1-bit-isHashNull}{7-bits-version}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
 
             //7-bit (0x7F) maximum for data version.
             if (Byte.compare((byte) 0x7F, data.getVersion()) < 0) {
                 throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
             }
 
+            final int version = data.getVersion();
+            if (upgradeFromV0 || version == 0) {
+                return serializeV0(data);
+            } else if (version == 1) {
+                return serializeV1(data);
+            } else {
+                throw new UnsupportedVersionException("Unsupported SubscriptionWrapper version " + data.getVersion());
+            }
+        }
+
+        private byte[] serializePrimaryKey(final SubscriptionWrapper<K> data) {
             if (primaryKeySerializationPseudoTopic == null) {
                 primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
             }
 
-            final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(
+            return  primaryKeySerializer.serialize(
                 primaryKeySerializationPseudoTopic,
                 data.getPrimaryKey()
             );
+        }
 
+        private ByteBuffer serializeCommon(final SubscriptionWrapper<K> data, final byte version, final int extraLength) {
+            final byte[] primaryKeySerializedData = serializePrimaryKey(data);
             final ByteBuffer buf;
+            int dataLength = 2 + primaryKeySerializedData.length + extraLength;
             if (data.getHash() != null) {
-                buf = ByteBuffer.allocate(2 + 2 * Long.BYTES + primaryKeySerializedData.length);
-                buf.put(data.getVersion());
+                dataLength += 2 * Long.BYTES;
+                buf = ByteBuffer.allocate(dataLength);
+                buf.put(version);
             } else {
                 //Don't store hash as it's null.
-                buf = ByteBuffer.allocate(2 + primaryKeySerializedData.length);
-                buf.put((byte) (data.getVersion() | (byte) 0x80));
+                buf = ByteBuffer.allocate(dataLength);
+                buf.put((byte) (version | (byte) 0x80));
             }
-
             buf.put(data.getInstruction().getValue());
             final long[] elem = data.getHash();
             if (data.getHash() != null) {
@@ -95,9 +149,18 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
                 buf.putLong(elem[1]);
             }
             buf.put(primaryKeySerializedData);
-            return buf.array();
+            return buf;
+        }
+
+        private byte[] serializeV0(final SubscriptionWrapper<K> data) {
+            return serializeCommon(data, (byte) 0, 0).array();
         }
 
+        private byte[] serializeV1(final SubscriptionWrapper<K> data) {
+            final ByteBuffer buf = serializeCommon(data, data.getVersion(), Integer.BYTES);
+            buf.putInt(data.getPrimaryPartition());
+            return buf.array();
+        }
     }
 
     private static class SubscriptionWrapperDeserializer<K>
@@ -123,15 +186,15 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
 
         @Override
         public SubscriptionWrapper<K> deserialize(final String ignored, final byte[] data) {
-            //{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}
+            //{7-bits-version}{1-bit-isHashNull}{1-byte-instruction}{Optional-16-byte-Hash}{PK-serialized}{4-bytes-primaryPartition}
             final ByteBuffer buf = ByteBuffer.wrap(data);
             final byte versionAndIsHashNull = buf.get();
             final byte version = (byte) (0x7F & versionAndIsHashNull);
             final boolean isHashNull = (0x80 & versionAndIsHashNull) == 0x80;
             final SubscriptionWrapper.Instruction inst = SubscriptionWrapper.Instruction.fromValue(buf.get());
 
-            final long[] hash;
             int lengthSum = 2; //The first 2 bytes
+            final long[] hash;
             if (isHashNull) {
                 hash = null;
             } else {
@@ -141,17 +204,31 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
                 lengthSum += 2 * Long.BYTES;
             }
 
-            final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk
-            buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
+            final int primaryKeyLength;
+            if (version > 0) {
+                primaryKeyLength = data.length - lengthSum - Integer.BYTES;
+            } else {
+                primaryKeyLength = data.length - lengthSum;
+            }
+            final byte[] primaryKeyRaw = new byte[primaryKeyLength];
+            buf.get(primaryKeyRaw, 0, primaryKeyLength);
 
             if (primaryKeySerializationPseudoTopic == null) {
                 primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
             }
 
-            final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic,
-                                                                    primaryKeyRaw);
+            final K primaryKey = primaryKeyDeserializer.deserialize(
+                primaryKeySerializationPseudoTopic,
+                primaryKeyRaw
+            );
+            final Integer primaryPartition;
+            if (version > 0) {
+                primaryPartition = buf.getInt();
+            } else {
+                primaryPartition = null;
+            }
 
-            return new SubscriptionWrapper<>(hash, inst, primaryKey, version);
+            return new SubscriptionWrapper<>(hash, inst, primaryKey, version, primaryPartition);
         }
 
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 5e317efdba..4a3d46cfc3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -114,6 +114,17 @@ public final class AssignorConfiguration {
                     log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." +
                         " Please be prepared to remove the 'upgrade.from' config soon.");
                     return RebalanceProtocol.EAGER;
+                case StreamsConfig.UPGRADE_FROM_24:
+                case StreamsConfig.UPGRADE_FROM_25:
+                case StreamsConfig.UPGRADE_FROM_26:
+                case StreamsConfig.UPGRADE_FROM_27:
+                case StreamsConfig.UPGRADE_FROM_28:
+                case StreamsConfig.UPGRADE_FROM_30:
+                case StreamsConfig.UPGRADE_FROM_31:
+                case StreamsConfig.UPGRADE_FROM_32:
+                    // This config is for explicitly sending FK response to a requested partition
+                    // and should not affect the rebalance protocol
+                    break;
                 default:
                     throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
             }
@@ -157,6 +168,17 @@ public final class AssignorConfiguration {
                 case StreamsConfig.UPGRADE_FROM_23:
                     // These configs are for cooperative rebalancing and should not affect the metadata version
                     break;
+                case StreamsConfig.UPGRADE_FROM_24:
+                case StreamsConfig.UPGRADE_FROM_25:
+                case StreamsConfig.UPGRADE_FROM_26:
+                case StreamsConfig.UPGRADE_FROM_27:
+                case StreamsConfig.UPGRADE_FROM_28:
+                case StreamsConfig.UPGRADE_FROM_30:
+                case StreamsConfig.UPGRADE_FROM_31:
+                case StreamsConfig.UPGRADE_FROM_32:
+                    // This config is for explicitly sending FK response to a requested partition
+                    // and should not affect the metadata version
+                    break;
                 default:
                     throw new IllegalArgumentException(
                         "Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
index 6c7c0972b0..dd794b0107 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -89,7 +89,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
 
         valueGetterSupplier.put("lhs1", "lhsValue");
         final long[] oldHash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "oldLhsValue"));
-        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(oldHash, "rhsValue"), 0));
+        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(oldHash, "rhsValue", 0), 0));
         final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
         assertThat(forwarded, empty());
     }
@@ -114,7 +114,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
 
         valueGetterSupplier.put("lhs1", null);
         final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
-        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"), 0));
+        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue", 0), 0));
         final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
         assertThat(forwarded, empty());
     }
@@ -139,7 +139,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
 
         valueGetterSupplier.put("lhs1", "lhsValue");
         final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
-        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue"), 0));
+        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, "rhsValue", 0), 0));
         final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
         assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", "(lhsValue,rhsValue)", 0)));
@@ -165,7 +165,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
 
         valueGetterSupplier.put("lhs1", "lhsValue");
         final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
-        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null), 0));
+        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null, 0), 0));
         final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
         assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 0)));
@@ -191,7 +191,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
 
         valueGetterSupplier.put("lhs1", "lhsValue");
         final long[] hash = Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
-        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null), 0));
+        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null, 0), 0));
         final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
         assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", "(lhsValue,null)", 0)));
@@ -217,7 +217,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
 
         valueGetterSupplier.put("lhs1", null);
         final long[] hash = null;
-        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null), 0));
+        processor.process(new Record<>("lhs1", new SubscriptionResponseWrapper<>(hash, null, 0), 0));
         final List<MockProcessorContext.CapturedForward<? extends String, ? extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
         assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 0)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index 30fc0c3185..167c1f990f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -76,26 +76,28 @@ public class SubscriptionResponseWrapperSerdeTest {
     public void ShouldSerdeWithNonNullsTest() {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
         final String foreignValue = "foreignValue";
-        final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue);
+        final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
         final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
         final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
 
         assertArrayEquals(hashedValue, result.getOriginalValueHash());
         assertEquals(foreignValue, result.getForeignValue());
+        assertNull(result.getPrimaryPartition());
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void shouldSerdeWithNullForeignValueTest() {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
-        final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, null);
+        final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, null, 1);
         final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
         final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
 
         assertArrayEquals(hashedValue, result.getOriginalValueHash());
         assertNull(result.getForeignValue());
+        assertNull(result.getPrimaryPartition());
     }
 
     @Test
@@ -103,13 +105,14 @@ public class SubscriptionResponseWrapperSerdeTest {
     public void shouldSerdeWithNullHashTest() {
         final long[] hashedValue = null;
         final String foreignValue = "foreignValue";
-        final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue);
+        final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
         final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
         final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
 
         assertArrayEquals(hashedValue, result.getOriginalValueHash());
         assertEquals(foreignValue, result.getForeignValue());
+        assertNull(result.getPrimaryPartition());
     }
 
     @Test
@@ -117,19 +120,20 @@ public class SubscriptionResponseWrapperSerdeTest {
     public void shouldSerdeWithNullsTest() {
         final long[] hashedValue = null;
         final String foreignValue = null;
-        final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue);
+        final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
         final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
         final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
         final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
 
         assertArrayEquals(hashedValue, result.getOriginalValueHash());
         assertEquals(foreignValue, result.getForeignValue());
+        assertNull(result.getPrimaryPartition());
     }
 
     @Test
     public void shouldThrowExceptionWithBadVersionTest() {
         final long[] hashedValue = null;
         assertThrows(UnsupportedVersionException.class,
-            () -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) 0xFF));
+            () -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) 0xFF, 1));
     }
-}
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index e937efe2bc..8cd26d606d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -16,13 +16,16 @@
  */
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import java.util.Collections;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.internals.Murmur3;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
@@ -30,53 +33,229 @@ public class SubscriptionWrapperSerdeTest {
 
     @Test
     @SuppressWarnings("unchecked")
-    public void shouldSerdeTest() {
+    public void shouldSerdeV0Test() {
+        final byte version = 0;
         final String originalKey = "originalKey";
         final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
-        final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, originalKey);
+        final Integer primaryPartition = null;
+        final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
+            originalKey,
+            version,
+            primaryPartition);
         final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
-        final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
+        final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
+            .deserialize(null, serialized);
 
         assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
         assertArrayEquals(hashedValue, deserialized.getHash());
         assertEquals(originalKey, deserialized.getPrimaryKey());
+        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
+        assertEquals(version, deserialized.getVersion());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldSerdeV1Test() {
+        final byte version = 1;
+        final String originalKey = "originalKey";
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
+        final Integer primaryPartition = 10;
+        final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
+            originalKey,
+            version,
+            primaryPartition);
+        final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
+        final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
+            .deserialize(null, serialized);
+
+        assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
+        assertArrayEquals(hashedValue, deserialized.getHash());
+        assertEquals(originalKey, deserialized.getPrimaryKey());
+        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
+        assertEquals(version, deserialized.getVersion());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldSerdeWithV0IfUpgradeTest() {
+        final byte version = 1;
+        final String originalKey = "originalKey";
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        swSerde.configure(
+            Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_32),
+            true);
+        final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
+        final Integer primaryPartition = 10;
+        final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE,
+            originalKey,
+            version,
+            primaryPartition);
+        final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
+        final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
+            .deserialize(null, serialized);
+
+        assertEquals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, deserialized.getInstruction());
+        assertArrayEquals(hashedValue, deserialized.getHash());
+        assertEquals(originalKey, deserialized.getPrimaryKey());
+        assertEquals(0, deserialized.getVersion());
+        assertNull(deserialized.getPrimaryPartition());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldSerdeNullHashV0Test() {
+        final byte version = 0;
+        final String originalKey = "originalKey";
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final long[] hashedValue = null;
+        final Integer primaryPartition = null;
+        final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            originalKey,
+            version,
+            primaryPartition);
+        final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
+        final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
+
+        assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
+        assertArrayEquals(hashedValue, deserialized.getHash());
+        assertEquals(originalKey, deserialized.getPrimaryKey());
+        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
+        assertEquals(version, deserialized.getVersion());
     }
 
     @Test
     @SuppressWarnings("unchecked")
-    public void shouldSerdeNullHashTest() {
+    public void shouldSerdeNullHashV1Test() {
+        final byte version = 1;
         final String originalKey = "originalKey";
         final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = null;
-        final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
+        final Integer primaryPartition = 10;
+        final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            originalKey,
+            version,
+            primaryPartition);
+        final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
+        final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer()
+            .deserialize(null, serialized);
+
+        assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
+        assertArrayEquals(hashedValue, deserialized.getHash());
+        assertEquals(originalKey, deserialized.getPrimaryKey());
+        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
+        assertEquals(version, deserialized.getVersion());
+    }
+
+    @Test
+    public void shouldSerdeNullPrimaryPartitionOnV0Test() {
+        final String originalKey = "originalKey";
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
+        final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
+        final Integer primaryPartition = null;
+        final byte version = 0;
+        final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            originalKey,
+            version,
+            primaryPartition);
         final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
         final SubscriptionWrapper deserialized = (SubscriptionWrapper) swSerde.deserializer().deserialize(null, serialized);
 
         assertEquals(SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, deserialized.getInstruction());
         assertArrayEquals(hashedValue, deserialized.getHash());
         assertEquals(originalKey, deserialized.getPrimaryKey());
+        assertEquals(primaryPartition, deserialized.getPrimaryPartition());
+        assertEquals(version, deserialized.getVersion());
     }
 
     @Test
-    public void shouldThrowExceptionOnNullKeyTest() {
+    public void shouldThrowExceptionOnNullKeyV0Test() {
         final String originalKey = null;
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
+        final Integer primaryPartition = 10;
         assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue,
-            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey));
+            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            originalKey,
+            (byte) 0,
+            primaryPartition));
     }
 
     @Test
-    public void shouldThrowExceptionOnNullInstructionTest() {
+    public void shouldThrowExceptionOnNullKeyV1Test() {
+        final String originalKey = null;
+        final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
+        final Integer primaryPartition = 10;
+        assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue,
+            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            originalKey,
+            (byte) 1,
+            primaryPartition));
+    }
+
+    @Test
+    public void shouldThrowExceptionOnNullInstructionV0Test() {
+        final String originalKey = "originalKey";
+        final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
+        final Integer primaryPartition = 10;
+        assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(
+            hashedValue,
+            null,
+            originalKey,
+            (byte) 0,
+            primaryPartition));
+    }
+
+    @Test
+    public void shouldThrowExceptionOnNullInstructionV1Test() {
+        final String originalKey = "originalKey";
+        final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
+        final Integer primaryPartition = 10;
+        assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(
+            hashedValue,
+            null,
+            originalKey,
+            (byte) 0,
+            primaryPartition));
+    }
+
+    @Test
+    public void shouldThrowExceptionOnNullPrimaryPartitionV1Test() {
+        final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final String originalKey = "originalKey";
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
-        assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue, null, originalKey));
+        final Integer primaryPartition = null;
+        final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            originalKey,
+            (byte) 1,
+            primaryPartition);
+        assertThrows(NullPointerException.class, () -> swSerde.serializer().serialize(null, wrapper));
     }
 
     @Test (expected = UnsupportedVersionException.class)
     public void shouldThrowExceptionOnUnsupportedVersionTest() {
         final String originalKey = "originalKey";
         final long[] hashedValue = null;
-        new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey, (byte) 0x80);
+        final Integer primaryPartition = 10;
+        new SubscriptionWrapper<>(
+            hashedValue,
+            SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            originalKey,
+            (byte) 0x80,
+            primaryPartition);
     }
 }