You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/07/27 18:58:37 UTC
[kafka] branch trunk updated: KAFKA-13769: Add tests for ForeignJoinSubscriptionProcessorSupplier (#12437)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d076b7ad0e KAFKA-13769: Add tests for ForeignJoinSubscriptionProcessorSupplier (#12437)
d076b7ad0e is described below
commit d076b7ad0ee3a2dba710ff79d6c4dd5840766e9f
Author: Alex Sorokoumov <91...@users.noreply.github.com>
AuthorDate: Wed Jul 27 20:58:12 2022 +0200
KAFKA-13769: Add tests for ForeignJoinSubscriptionProcessorSupplier (#12437)
Reviewers: Adam Bellemare <ad...@gmail.com>, John Roesler <vv...@apache.org>
---
.../SubscriptionResponseWrapper.java | 24 ++
.../foreignkeyjoin/SubscriptionWrapper.java | 5 +-
...reignJoinSubscriptionProcessorSupplierTest.java | 377 +++++++++++++++++++++
.../SubscriptionWrapperSerdeTest.java | 22 +-
4 files changed, 416 insertions(+), 12 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
index ef9a3a055b..99556f16e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapper.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+import java.util.Objects;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import java.util.Arrays;
@@ -72,4 +73,27 @@ public class SubscriptionResponseWrapper<FV> {
", primaryPartition=" + primaryPartition +
'}';
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final SubscriptionResponseWrapper<?> that = (SubscriptionResponseWrapper<?>) o;
+ return version == that.version &&
+ Arrays.equals(originalValueHash,
+ that.originalValueHash) &&
+ Objects.equals(foreignValue, that.foreignValue) &&
+ Objects.equals(primaryPartition, that.primaryPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(foreignValue, version, primaryPartition);
+ result = 31 * result + Arrays.hashCode(originalValueHash);
+ return result;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
index a75a419cd7..41d5f1198e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapper.java
@@ -23,7 +23,10 @@ import java.util.Objects;
public class SubscriptionWrapper<K> {
- static final byte CURRENT_VERSION = 1;
+ static final byte VERSION_0 = 0;
+ static final byte VERSION_1 = 1;
+
+ static final byte CURRENT_VERSION = VERSION_1;
// v0 fields:
private final long[] hash;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
new file mode 100644
index 0000000000..1bf708b8ab
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ForeignJoinSubscriptionProcessorSupplierTest {
+ final Map<String, ValueAndTimestamp<String>> fks = Collections.singletonMap(
+ "fk1", ValueAndTimestamp.make("foo", 1L)
+ );
+ final KTableValueGetterSupplier<String, String> valueGetterSupplier = valueGetterSupplier(fks);
+ final Processor<CombinedKey<String, String>,
+ Change<ValueAndTimestamp<SubscriptionWrapper<String>>>,
+ String,
+ SubscriptionResponseWrapper<String>>
+ processor = processor(valueGetterSupplier);
+
+ @Test
+ public void shouldDetectVersionChange() {
+ // This test serves as a reminder to add new tests once we bump SubscriptionWrapper version.
+ Assert.assertEquals(SubscriptionWrapper.VERSION_1, SubscriptionWrapper.CURRENT_VERSION);
+ }
+
+ @Test
+ public void shouldDeleteKeyAndPropagateFKV0() {
+ final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+ new long[]{1L},
+ Instruction.DELETE_KEY_AND_PROPAGATE,
+ "pk1",
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+ new Record<>(
+ new CombinedKey<>("fk1", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ new Record<>(
+ "pk1",
+ new SubscriptionResponseWrapper<>(
+ newValue.getHash(),
+ null,
+ null),
+ 1L
+ ),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldDeleteKeyAndPropagateFKV1() {
+ final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+ new long[]{1L},
+ Instruction.DELETE_KEY_AND_PROPAGATE,
+ "pk1",
+ SubscriptionWrapper.VERSION_1,
+ 12
+ );
+ final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+ new Record<>(
+ new CombinedKey<>("fk1", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ new Record<>(
+ "pk1",
+ new SubscriptionResponseWrapper<>(
+ newValue.getHash(),
+ null,
+ 12
+ ),
+ 1L
+ ),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldPropagateOnlyIfFKAvailableV0() {
+ final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+ new long[]{1L},
+ Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+ "pk1",
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+ new Record<>(
+ new CombinedKey<>("fk1", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ new Record<>(
+ "pk1",
+ new SubscriptionResponseWrapper<>(
+ newValue.getHash(),
+ "foo",
+ null
+ ),
+ 1L
+ ),
+ forwarded.get(0).record()
+ );
+ }
+
+ @Test
+ public void shouldPropagateOnlyIfFKAvailableV1() {
+ final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+ new long[]{1L},
+ Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
+ "pk1",
+ SubscriptionWrapper.VERSION_1,
+ 12
+ );
+ final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+ new Record<>(
+ new CombinedKey<>("fk1", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ new Record<>(
+ "pk1",
+ new SubscriptionResponseWrapper<>(
+ newValue.getHash(),
+ "foo",
+ 12
+ ),
+ 1L
+ ),
+ forwarded.get(0).record());
+ }
+
+ @Test
+ public void shouldPropagateNullIfNoFKAvailableV0() {
+ final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+ new long[]{1L},
+ Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+ "pk1",
+ SubscriptionWrapper.VERSION_0,
+ null
+ );
+ Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+ new Record<>(
+ new CombinedKey<>("fk1", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ // propagate matched FK
+ List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ new Record<>(
+ "pk1",
+ new SubscriptionResponseWrapper<>(
+ newValue.getHash(),
+ "foo",
+ null
+ ),
+ 1L
+ ),
+ forwarded.get(0).record());
+
+ record = new Record<>(
+ new CombinedKey<>("fk9000", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ // propagate null if there is no match
+ forwarded = context.forwarded();
+ Assert.assertEquals(2, forwarded.size());
+ Assert.assertEquals(
+ new Record<>(
+ "pk1",
+ new SubscriptionResponseWrapper<>(
+ newValue.getHash(),
+ null,
+ null
+ ),
+ 1L
+ ),
+ forwarded.get(1).record());
+ }
+
+ @Test
+ public void shouldPropagateNullIfNoFKAvailableV1() {
+ final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+ new long[]{1L},
+ Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
+ "pk1",
+ SubscriptionWrapper.VERSION_1,
+ 12);
+ Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+ new Record<>(
+ new CombinedKey<>("fk1", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+ Assert.assertEquals(1, forwarded.size());
+ Assert.assertEquals(
+ new Record<>(
+ "pk1",
+ new SubscriptionResponseWrapper<>(
+ newValue.getHash(),
+ "foo",
+ 12
+ ),
+ 1L
+ ),
+ forwarded.get(0).record());
+
+ record = new Record<>(
+ new CombinedKey<>("fk9000", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ // propagate null if there is no match
+ forwarded = context.forwarded();
+ Assert.assertEquals(2, forwarded.size());
+ Assert.assertEquals(
+ new Record<>(
+ "pk1",
+ new SubscriptionResponseWrapper<>(
+ newValue.getHash(),
+ null,
+ 12
+ ),
+ 1L
+ ),
+ forwarded.get(1).record());
+ }
+
+ @Test
+ public void shouldDeleteKeyNoPropagateV0() {
+ final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+ new long[]{1L},
+ Instruction.DELETE_KEY_NO_PROPAGATE,
+ "pk1",
+ SubscriptionWrapper.VERSION_0,
+ null);
+ final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+ new Record<>(
+ new CombinedKey<>("fk1", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+ Assert.assertEquals(0, forwarded.size());
+ }
+
+ @Test
+ public void shouldDeleteKeyNoPropagateV1() {
+ final MockProcessorContext<String, SubscriptionResponseWrapper<String>> context = new MockProcessorContext<>();
+ processor.init(context);
+
+ final SubscriptionWrapper<String> newValue = new SubscriptionWrapper<>(
+ new long[]{1L},
+ Instruction.DELETE_KEY_NO_PROPAGATE,
+ "pk1",
+ SubscriptionWrapper.VERSION_1,
+ 12);
+ final Record<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> record =
+ new Record<>(new CombinedKey<>("fk1", "pk1"),
+ new Change<>(ValueAndTimestamp.make(newValue, 1L), null),
+ 1L
+ );
+ processor.process(record);
+ final List<CapturedForward<? extends String, ? extends SubscriptionResponseWrapper<String>>> forwarded = context.forwarded();
+ Assert.assertEquals(0, forwarded.size());
+ }
+
+ private KTableValueGetterSupplier<String, String> valueGetterSupplier(final Map<String, ValueAndTimestamp<String>> map) {
+ final KTableValueGetter<String, String> valueGetter = new KTableValueGetter<String, String>() {
+
+ @Override
+ public ValueAndTimestamp<String> get(final String key) {
+ return map.get(key);
+ }
+
+ @Override
+ public void init(final ProcessorContext context) {
+
+ }
+ };
+ return new KTableValueGetterSupplier<String, String>() {
+ @Override
+ public KTableValueGetter<String, String> get() {
+ return valueGetter;
+ }
+
+ @Override
+ public String[] storeNames() {
+ return new String[0];
+ }
+ };
+ }
+
+ private Processor<CombinedKey<String, String>,
+ Change<ValueAndTimestamp<SubscriptionWrapper<String>>>,
+ String,
+ SubscriptionResponseWrapper<String>> processor(final KTableValueGetterSupplier<String, String> valueGetterSupplier) {
+ final SubscriptionJoinForeignProcessorSupplier<String, String, String> supplier =
+ new SubscriptionJoinForeignProcessorSupplier<>(valueGetterSupplier);
+ return supplier.get();
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index 8cd26d606d..709a94bc6d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -34,7 +34,7 @@ public class SubscriptionWrapperSerdeTest {
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeV0Test() {
- final byte version = 0;
+ final byte version = SubscriptionWrapper.VERSION_0;
final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
@@ -59,7 +59,7 @@ public class SubscriptionWrapperSerdeTest {
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeV1Test() {
- final byte version = 1;
+ final byte version = SubscriptionWrapper.VERSION_1;
final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
@@ -84,7 +84,7 @@ public class SubscriptionWrapperSerdeTest {
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeWithV0IfUpgradeTest() {
- final byte version = 1;
+ final byte version = SubscriptionWrapper.VERSION_1;
final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
swSerde.configure(
@@ -112,7 +112,7 @@ public class SubscriptionWrapperSerdeTest {
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeNullHashV0Test() {
- final byte version = 0;
+ final byte version = SubscriptionWrapper.VERSION_0;
final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
final long[] hashedValue = null;
@@ -136,7 +136,7 @@ public class SubscriptionWrapperSerdeTest {
@Test
@SuppressWarnings("unchecked")
public void shouldSerdeNullHashV1Test() {
- final byte version = 1;
+ final byte version = SubscriptionWrapper.VERSION_1;
final String originalKey = "originalKey";
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
final long[] hashedValue = null;
@@ -164,7 +164,7 @@ public class SubscriptionWrapperSerdeTest {
final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final Integer primaryPartition = null;
- final byte version = 0;
+ final byte version = SubscriptionWrapper.VERSION_0;
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(
hashedValue,
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
@@ -189,7 +189,7 @@ public class SubscriptionWrapperSerdeTest {
assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue,
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
originalKey,
- (byte) 0,
+ SubscriptionWrapper.VERSION_0,
primaryPartition));
}
@@ -201,7 +201,7 @@ public class SubscriptionWrapperSerdeTest {
assertThrows(NullPointerException.class, () -> new SubscriptionWrapper<>(hashedValue,
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
originalKey,
- (byte) 1,
+ SubscriptionWrapper.VERSION_1,
primaryPartition));
}
@@ -214,7 +214,7 @@ public class SubscriptionWrapperSerdeTest {
hashedValue,
null,
originalKey,
- (byte) 0,
+ SubscriptionWrapper.VERSION_0,
primaryPartition));
}
@@ -227,7 +227,7 @@ public class SubscriptionWrapperSerdeTest {
hashedValue,
null,
originalKey,
- (byte) 0,
+ SubscriptionWrapper.VERSION_0,
primaryPartition));
}
@@ -241,7 +241,7 @@ public class SubscriptionWrapperSerdeTest {
hashedValue,
SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE,
originalKey,
- (byte) 1,
+ SubscriptionWrapper.VERSION_1,
primaryPartition);
assertThrows(NullPointerException.class, () -> swSerde.serializer().serialize(null, wrapper));
}