You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/07/27 18:58:37 UTC

[kafka] branch trunk updated: KAFKA-13769: Add tests for ForeignJoinSubscriptionProcessorSupplier (#12437)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d076b7ad0e KAFKA-13769: Add tests for ForeignJoinSubscriptionProcessorSupplier (#12437)
d076b7ad0e is described below

commit d076b7ad0ee3a2dba710ff79d6c4dd5840766e9f
Author: Alex Sorokoumov <91...@users.noreply.github.com>
AuthorDate: Wed Jul 27 20:58:12 2022 +0200

    KAFKA-13769: Add tests for ForeignJoinSubscriptionProcessorSupplier (#12437)
    
    Reviewers: Adam Bellemare <ad...@gmail.com>, John Roesler <vv...@apache.org>
---
 .../SubscriptionResponseWrapper.java               |  24 ++
 .../foreignkeyjoin/SubscriptionWrapper.java        |   5 +-
 ...reignJoinSubscriptionProcessorSupplierTest.java | 377 +++++++++++++++++++++
 .../SubscriptionWrapperSerdeTest.java              |  22 +-
 4 files changed, 416 insertions(+), 12 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
index ef9a3a055b..99556f16e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import java.util.Objects;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 
 import java.util.Arrays;
@@ -72,4 +73,27 @@ public class SubscriptionResponseWrapper<FV> {
             ", primaryPartition=" + primaryPartition +
             '}';
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final SubscriptionResponseWrapper<?> that = (SubscriptionResponseWrapper<?>) o;
+        return version == that.version &&
+               Arrays.equals(originalValueHash,
+               that.originalValueHash) &&
+               Objects.equals(foreignValue, that.foreignValue) &&
+               Objects.equals(primaryPartition, that.primaryPartition);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(foreignValue, version, primaryPartition);
+        result = 31 * result + Arrays.hashCode(originalValueHash);
+        return result;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
index a75a419cd7..41d5f1198e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
@@ -23,7 +23,10 @@ import java.util.Objects;
 
 
 public class SubscriptionWrapper<K> {
-    static final byte CURRENT_VERSION = 1;
+    static final byte VERSION_0 = 0;
+    static final byte VERSION_1 = 1;
+
+    static final byte CURRENT_VERSION = VERSION_1;
 
     // v0 fields:
     private final long[] hash;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
new file mode 100644
index 0000000000..1bf708b8ab
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ForeignJoinSubscriptionProcessorSupplierTest {
+    final Map<String, ValueAndTimestamp<String>> fks = Collections.singletonMap(
+        "fk1", ValueAndTimestamp.make("foo", 1L)
+    );
+    final KTableValueGetterSupplier<String, String> valueGetterSupplier = valueGetterSupplier(fks);
+    final Processor<CombinedKey<String, String>,
+                    Change<ValueAndTimestamp<SubscriptionWrapper<String>>>,
+                    String,
+                    SubscriptionResponseWrapper<String>>
+        processor = processor(valueGetterSupplier);
+
+    @Test
+    public void shouldDetectVersionChange() {
+        // This test serves as a reminder to add new tests once we bump SubscriptionWrapper version.
+        Assert.assertEquals(SubscriptionWrapper.VERSION_1, SubscriptionWrapper.CURRENT_VERSION);
+    }
+
+    @Test
+    public void shouldDeleteKeyAndPropagateFKV0() {
+        final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+            new long[]{1L},
+            Instruction.DELETE_KEY_AND_PROPAGATE,
+            "pk1",
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+            new Record<>(
+                new CombinedKey<>("fk1", "pk1"),
+                new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+                1L
+            );
+        processor.process(record);
+        final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            new Record<>(
+                "pk1",
+                new SubscriptionResponseWrapper<>(
+                    newValue.getHash(),
+                    null,
+                    null),
+                1L
+            ),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldDeleteKeyAndPropagateFKV1() {
+        final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+            new long[]{1L},
+            Instruction.DELETE_KEY_AND_PROPAGATE,
+            "pk1",
+            SubscriptionWrapper.VERSION_1,
+            12
+        );
+        final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+            new Record<>(
+                new CombinedKey<>("fk1", "pk1"),
+                new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+                1L
+            );
+        processor.process(record);
+        final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            new Record<>(
+                "pk1",
+                new SubscriptionResponseWrapper<>(
+                    newValue.getHash(),
+                    null,
+                    12
+                ),
+                1L
+            ),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldPropagateOnlyIfFKAvailableV0() {
+        final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+            new long[]{1L},
+            Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            "pk1",
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+            new Record<>(
+                new CombinedKey<>("fk1", "pk1"),
+                new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+                1L
+        );
+        processor.process(record);
+        final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            new Record<>(
+                "pk1",
+                new SubscriptionResponseWrapper<>(
+                    newValue.getHash(),
+                    "foo",
+                    null
+                ),
+                1L
+            ),
+            forwarded.get(0).record()
+        );
+    }
+
+    @Test
+    public void shouldPropagateOnlyIfFKAvailableV1() {
+        final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+            new long[]{1L},
+            Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+            "pk1",
+            SubscriptionWrapper.VERSION_1,
+            12
+        );
+        final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+            new Record<>(
+                new CombinedKey<>("fk1", "pk1"),
+                new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+                1L
+            );
+        processor.process(record);
+        final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            new Record<>(
+                "pk1",
+                new SubscriptionResponseWrapper<>(
+                    newValue.getHash(),
+                    "foo",
+                     12
+                ),
+                1L
+            ),
+            forwarded.get(0).record());
+    }
+
+    @Test
+    public void shouldPropagateNullIfNoFKAvailableV0() {
+        final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+            new long[]{1L},
+            Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+            "pk1",
+            SubscriptionWrapper.VERSION_0,
+            null
+        );
+        Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+            new Record<>(
+                new CombinedKey<>("fk1", "pk1"),
+                new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+                1L
+            );
+        processor.process(record);
+        // propagate matched FK
+        List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            new Record<>(
+                "pk1",
+                new SubscriptionResponseWrapper<>(
+                    newValue.getHash(),
+                    "foo",
+                    null
+                ),
+                1L
+            ),
+            forwarded.get(0).record());
+
+        record = new Record<>(
+                new CombinedKey<>("fk9000", "pk1"),
+                new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+                1L
+            );
+        processor.process(record);
+        // propagate null if there is no match
+        forwarded = context.forwarded();
+        Assert.assertEquals(2, forwarded.size());
+        Assert.assertEquals(
+            new Record<>(
+                "pk1",
+                new SubscriptionResponseWrapper<>(
+                    newValue.getHash(),
+                    null,
+                    null
+                ),
+                1L
+            ),
+            forwarded.get(1).record());
+    }
+
+    @Test
+    public void shouldPropagateNullIfNoFKAvailableV1() {
+        final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+            new long[]{1L},
+            Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+            "pk1",
+            SubscriptionWrapper.VERSION_1,
+            12);
+        Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+            new Record<>(
+                new CombinedKey<>("fk1", "pk1"),
+                new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+                1L
+            );
+        processor.process(record);
+        List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+        Assert.assertEquals(1, forwarded.size());
+        Assert.assertEquals(
+            new Record<>(
+                "pk1",
+                new SubscriptionResponseWrapper<>(
+                    newValue.getHash(),
+                    "foo",
+                    12
+                ),
+                1L
+            ),
+            forwarded.get(0).record());
+
+        record = new Record<>(
+            new CombinedKey<>("fk9000", "pk1"),
+            new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+            1L
+        );
+        processor.process(record);
+        // propagate null if there is no match
+        forwarded = context.forwarded();
+        Assert.assertEquals(2, forwarded.size());
+        Assert.assertEquals(
+            new Record<>(
+                "pk1",
+                new SubscriptionResponseWrapper<>(
+                    newValue.getHash(),
+                    null,
+                    12
+                ),
+                1L
+            ),
+            forwarded.get(1).record());
+    }
+
+    @Test
+    public void shouldDeleteKeyNoPropagateV0() {
+        final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+            new long[]{1L},
+            Instruction.DELETE_KEY_NO_PROPAGATE,
+            "pk1",
+            SubscriptionWrapper.VERSION_0,
+            null);
+        final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+            new Record<>(
+                new CombinedKey<>("fk1", "pk1"),
+                new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+                1L
+            );
+        processor.process(record);
+        final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+        Assert.assertEquals(0, forwarded.size());
+    }
+
+    @Test
+    public void shouldDeleteKeyNoPropagateV1() {
+        final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+        processor.init(context);
+
+        final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+            new long[]{1L},
+            Instruction.DELETE_KEY_NO_PROPAGATE,
+            "pk1",
+            SubscriptionWrapper.VERSION_1,
+            12);
+        final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+            new Record<>(new CombinedKey<>("fk1", "pk1"),
+                         new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+                        1L
+            );
+        processor.process(record);
+        final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+        Assert.assertEquals(0, forwarded.size());
+    }
+
+    private KTableValueGetterSupplier<String, String> valueGetterSupplier(final Map<String, ValueAndTimestamp<String>> map) {
+        final KTableValueGetter<String, String> valueGetter = new KTableValueGetter<String, String>() {
+
+            @Override
+            public ValueAndTimestamp<String> get(final String key) {
+                return map.get(key);
+            }
+
+            @Override
+            public void init(final ProcessorContext context) {
+
+            }
+        };
+        return new KTableValueGetterSupplier<String, String>() {
+            @Override
+            public KTableValueGetter<String, String> get() {
+                return valueGetter;
+            }
+
+            @Override
+            public String[] storeNames() {
+                return new String[0];
+            }
+        };
+    }
+
+    private Processor<CombinedKey<String, String>,
+                      Change<ValueAndTimestamp<SubscriptionWrapper<String>>>,
+                      String,
+                      SubscriptionResponseWrapper<String>> processor(final KTableValueGetterSupplier<String, String> valueGetterSupplier) {
+        final SubscriptionJoinForeignProcessorSupplier<String, String, String> supplier =
+            new SubscriptionJoinForeignProcessorSupplier<>(valueGetterSupplier);
+        return supplier.get();
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index 8cd26d606d..709a94bc6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -34,7 +34,7 @@ public class SubscriptionWrapperSerdeTest {
     @Test
     @SuppressWarnings("unchecked")
     public void shouldSerdeV0Test() {
-        final byte version = 0;
+        final byte version = SubscriptionWrapper.VERSION_0;
         final String originalKey = "originalKey";
         final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
@@ -59,7 +59,7 @@ public class SubscriptionWrapperSerdeTest {
     @Test
     @SuppressWarnings("unchecked")
     public void shouldSerdeV1Test() {
-        final byte version = 1;
+        final byte version = SubscriptionWrapper.VERSION_1;
         final String originalKey = "originalKey";
         final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
@@ -84,7 +84,7 @@ public class SubscriptionWrapperSerdeTest {
     @Test
     @SuppressWarnings("unchecked")
     public void shouldSerdeWithV0IfUpgradeTest() {
-        final byte version = 1;
+        final byte version = SubscriptionWrapper.VERSION_1;
         final String originalKey = "originalKey";
         final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         swSerde.configure(
@@ -112,7 +112,7 @@ public class SubscriptionWrapperSerdeTest {
     @Test
     @SuppressWarnings("unchecked")
     public void shouldSerdeNullHashV0Test() {
-        final byte version = 0;
+        final byte version = SubscriptionWrapper.VERSION_0;
         final String originalKey = "originalKey";
         final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = null;
@@ -136,7 +136,7 @@ public class SubscriptionWrapperSerdeTest {
     @Test
     @SuppressWarnings("unchecked")
     public void shouldSerdeNullHashV1Test() {
-        final byte version = 1;
+        final byte version = SubscriptionWrapper.VERSION_1;
         final String originalKey = "originalKey";
         final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = null;
@@ -164,7 +164,7 @@ public class SubscriptionWrapperSerdeTest {
         final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
         final Integer primaryPartition = null;
-        final byte version = 0;
+        final byte version = SubscriptionWrapper.VERSION_0;
         final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
             hashedValue,
             SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
@@ -189,7 +189,7 @@ public class SubscriptionWrapperSerdeTest {
         assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue,
             SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
             originalKey,
-            (byte) 0,
+            SubscriptionWrapper.VERSION_0,
             primaryPartition));
     }
 
@@ -201,7 +201,7 @@ public class SubscriptionWrapperSerdeTest {
         assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue,
             SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
             originalKey,
-            (byte) 1,
+            SubscriptionWrapper.VERSION_1,
             primaryPartition));
     }
 
@@ -214,7 +214,7 @@ public class SubscriptionWrapperSerdeTest {
             hashedValue,
             null,
             originalKey,
-            (byte) 0,
+            SubscriptionWrapper.VERSION_0,
             primaryPartition));
     }
 
@@ -227,7 +227,7 @@ public class SubscriptionWrapperSerdeTest {
             hashedValue,
             null,
             originalKey,
-            (byte) 0,
+            SubscriptionWrapper.VERSION_0,
             primaryPartition));
     }
 
@@ -241,7 +241,7 @@ public class SubscriptionWrapperSerdeTest {
             hashedValue,
             SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
             originalKey,
-            (byte) 1,
+            SubscriptionWrapper.VERSION_1,
             primaryPartition);
         assertThrows(NullPointerException.class, () -> swSerde.serializer().serialize(null, wrapper));
     }