You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/02/16 02:08:00 UTC
[kafka] branch trunk updated: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dcaf95a35f4 KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249)
dcaf95a35f4 is described below
commit dcaf95a35f4ba7de73433aa5a5b1a33427744e45
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Wed Feb 15 18:07:47 2023 -0800
KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249)
Introduces a new Serde, that serializes a value and timestamp as a single byte array, where the value may be null (in order to represent putting a tombstone with timestamp into the versioned store).
Part of KIP-889.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
gradle/spotbugs-exclude.xml | 7 ++
.../kafka/streams/state/ValueAndTimestamp.java | 14 ++-
.../NullableValueAndTimestampDeserializer.java | 105 +++++++++++++++++++++
.../internals/NullableValueAndTimestampSerde.java | 89 +++++++++++++++++
.../NullableValueAndTimestampSerializer.java | 92 ++++++++++++++++++
.../NullableValueAndTimestampSerdeTest.java | 72 ++++++++++++++
6 files changed, 378 insertions(+), 1 deletion(-)
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index efc1f03b950..e040e54d306 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -488,6 +488,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<!-- END Suppress warnings for unused members that are undetectably used by Jackson -->
+ <Match>
+ <!-- Boolean deserializer intentionally returns null on null input. -->
+ <Class name="org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde$BooleanSerde$BooleanDeserializer"/>
+ <Method name="deserialize"/>
+ <Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
+ </Match>
+
<Match>
<!-- Suppress a warning about ignoring return value because this is intentional. -->
<Class name="org.apache.kafka.common.config.AbstractConfig$ResolvingMap"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
index 1cf727712b2..227021d6cf7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
@@ -31,7 +31,6 @@ public final class ValueAndTimestamp<V> {
private ValueAndTimestamp(final V value,
final long timestamp) {
- Objects.requireNonNull(value);
this.value = value;
this.timestamp = timestamp;
}
@@ -50,6 +49,19 @@ public final class ValueAndTimestamp<V> {
return value == null ? null : new ValueAndTimestamp<>(value, timestamp);
}
+ /**
+ * Create a new {@link ValueAndTimestamp} instance. The provided {@code value} may be {@code null}.
+ *
+ * @param value the value
+ * @param timestamp the timestamp
+ * @param <V> the type of the value
+ * @return a new {@link ValueAndTimestamp} instance
+ */
+ public static <V> ValueAndTimestamp<V> makeAllowNullable(
+ final V value, final long timestamp) {
+ return new ValueAndTimestamp<>(value, timestamp);
+ }
+
/**
* Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
* if the parameter is not {@code null}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
new file mode 100644
index 00000000000..7d70708ff12
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_BOOLEAN_LENGTH;
+import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_TIMESTAMP_LENGTH;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
+ public final Deserializer<V> valueDeserializer;
+ private final Deserializer<Long> timestampDeserializer;
+ private final Deserializer<Boolean> booleanDeserializer;
+
+ NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) {
+ this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
+ timestampDeserializer = new LongDeserializer();
+ booleanDeserializer = new BooleanDeserializer();
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ valueDeserializer.configure(configs, isKey);
+ timestampDeserializer.configure(configs, isKey);
+ booleanDeserializer.configure(configs, isKey);
+ }
+
+ @Override
+ public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) {
+ if (rawValueAndTimestamp == null) {
+ return null;
+ }
+
+ final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp));
+ final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp));
+ if (isTombstone) {
+ return ValueAndTimestamp.makeAllowNullable(null, timestamp);
+ } else {
+ final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
+ if (value == null) {
+ throw new SerializationException("Deserializer cannot deserialize non-null bytes as null");
+ }
+ return ValueAndTimestamp.make(value, timestamp);
+ }
+ }
+
+ @Override
+ public void close() {
+ valueDeserializer.close();
+ timestampDeserializer.close();
+ booleanDeserializer.close();
+ }
+
+ @Override
+ public void setIfUnset(final SerdeGetter getter) {
+ // NullableValueAndTimestampDeserializer never wraps a null deserializer (or configure would throw),
+ // but it may wrap a deserializer that itself wraps a null deserializer.
+ initNullableDeserializer(valueDeserializer, getter);
+ }
+
+ private static byte[] rawTimestamp(final byte[] rawValueAndTimestamp) {
+ final byte[] rawTimestamp = new byte[RAW_TIMESTAMP_LENGTH];
+ System.arraycopy(rawValueAndTimestamp, 0, rawTimestamp, 0, RAW_TIMESTAMP_LENGTH);
+ return rawTimestamp;
+ }
+
+ private static byte[] rawIsTombstone(final byte[] rawValueAndTimestamp) {
+ final byte[] rawIsTombstone = new byte[RAW_BOOLEAN_LENGTH];
+ System.arraycopy(rawValueAndTimestamp, RAW_TIMESTAMP_LENGTH, rawIsTombstone, 0, RAW_BOOLEAN_LENGTH);
+ return rawIsTombstone;
+ }
+
+ private static byte[] rawValue(final byte[] rawValueAndTimestamp) {
+ final int rawValueLength = rawValueAndTimestamp.length - RAW_TIMESTAMP_LENGTH - RAW_BOOLEAN_LENGTH;
+ final byte[] rawValue = new byte[rawValueLength];
+ System.arraycopy(rawValueAndTimestamp, RAW_TIMESTAMP_LENGTH + RAW_BOOLEAN_LENGTH, rawValue, 0, rawValueLength);
+ return rawValue;
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
new file mode 100644
index 00000000000..090a4daa35c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ * <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+
+ static final int RAW_TIMESTAMP_LENGTH = 8;
+ static final int RAW_BOOLEAN_LENGTH = 1;
+
+ public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+ super(
+ new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+ new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+ );
+ }
+
+ static final class BooleanSerde {
+ private static final byte TRUE = 0x01;
+ private static final byte FALSE = 0x00;
+
+ static class BooleanSerializer implements Serializer<Boolean> {
+ @Override
+ public byte[] serialize(final String topic, final Boolean data) {
+ if (data == null) {
+ return null;
+ }
+
+ return new byte[] {
+ data ? TRUE : FALSE
+ };
+ }
+ }
+
+ static class BooleanDeserializer implements Deserializer<Boolean> {
+ @Override
+ public Boolean deserialize(final String topic, final byte[] data) {
+ if (data == null) {
+ return null;
+ }
+
+ if (data.length != 1) {
+ throw new SerializationException("Size of data received by BooleanDeserializer is not 1");
+ }
+
+ if (data[0] == TRUE) {
+ return true;
+ } else if (data[0] == FALSE) {
+ return false;
+ } else {
+ throw new SerializationException("Unexpected byte received by BooleanDeserializer: " + data[0]);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
new file mode 100644
index 00000000000..93eaca0ff21
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_BOOLEAN_LENGTH;
+import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_TIMESTAMP_LENGTH;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
+ public final Serializer<V> valueSerializer;
+ private final Serializer<Long> timestampSerializer;
+ private final Serializer<Boolean> booleanSerializer;
+
+ NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
+ this.valueSerializer = Objects.requireNonNull(valueSerializer);
+ timestampSerializer = new LongSerializer();
+ booleanSerializer = new BooleanSerializer();
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ valueSerializer.configure(configs, isKey);
+ timestampSerializer.configure(configs, isKey);
+ booleanSerializer.configure(configs, isKey);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) {
+ if (data == null) {
+ return null;
+ }
+ final byte[] rawValue = valueSerializer.serialize(topic, data.value());
+ final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null);
+ final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp());
+ if (rawIsTombstone.length != RAW_BOOLEAN_LENGTH) {
+ throw new SerializationException("Unexpected length for serialized boolean: " + rawIsTombstone.length);
+ }
+ if (rawTimestamp.length != RAW_TIMESTAMP_LENGTH) {
+ throw new SerializationException("Unexpected length for serialized timestamp: " + rawTimestamp.length);
+ }
+
+ final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
+ return ByteBuffer
+ .allocate(RAW_TIMESTAMP_LENGTH + RAW_BOOLEAN_LENGTH + nonNullRawValue.length)
+ .put(rawTimestamp)
+ .put(rawIsTombstone)
+ .put(nonNullRawValue)
+ .array();
+ }
+
+ @Override
+ public void close() {
+ valueSerializer.close();
+ timestampSerializer.close();
+ booleanSerializer.close();
+ }
+
+ @Override
+ public void setIfUnset(final SerdeGetter getter) {
+ // NullableValueAndTimestampSerializer never wraps a null serializer (or configure would throw),
+ // but it may wrap a serializer that itself wraps a null serializer.
+ initNullableSerializer(valueSerializer, getter);
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java
new file mode 100644
index 00000000000..3b5bb7fd618
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class NullableValueAndTimestampSerdeTest {
+
+ private final static NullableValueAndTimestampSerde<String> SERDE = new NullableValueAndTimestampSerde<>(new StringSerde());
+ private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = SERDE.serializer();
+ private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER = SERDE.deserializer();
+
+ @Test
+ public void shouldSerdeNull() {
+ assertThat(SERIALIZER.serialize(null, null), is(nullValue()));
+ assertThat(DESERIALIZER.deserialize(null, null), is(nullValue()));
+ }
+
+ @Test
+ public void shouldSerdeNonNull() {
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("foo", 10L);
+
+ final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+ assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+ assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+ }
+
+ @Test
+ public void shouldSerdeNonNullWithNullValue() {
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.makeAllowNullable(null, 10L);
+
+ final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+ assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+ assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+ }
+
+ @Test
+ public void shouldSerializeNonNullWithEmptyBytes() {
+ // empty string serializes to empty bytes
+ final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("", 10L);
+
+ final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+ assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+ assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+ }
+}
\ No newline at end of file