You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/02/16 02:08:00 UTC

[kafka] branch trunk updated: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dcaf95a35f4 KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249)
dcaf95a35f4 is described below

commit dcaf95a35f4ba7de73433aa5a5b1a33427744e45
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Wed Feb 15 18:07:47 2023 -0800

    KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249)
    
    Introduces a new Serde, that serializes a value and timestamp as a single byte array, where the value may be null (in order to represent putting a tombstone with timestamp into the versioned store).
    
    Part of KIP-889.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 gradle/spotbugs-exclude.xml                        |   7 ++
 .../kafka/streams/state/ValueAndTimestamp.java     |  14 ++-
 .../NullableValueAndTimestampDeserializer.java     | 105 +++++++++++++++++++++
 .../internals/NullableValueAndTimestampSerde.java  |  89 +++++++++++++++++
 .../NullableValueAndTimestampSerializer.java       |  92 ++++++++++++++++++
 .../NullableValueAndTimestampSerdeTest.java        |  72 ++++++++++++++
 6 files changed, 378 insertions(+), 1 deletion(-)

diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index efc1f03b950..e040e54d306 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -488,6 +488,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
 
     <!-- END Suppress warnings for unused members that are undetectably used by Jackson -->
 
+    <Match>
+        <!-- Boolean deserializer intentionally returns null on null input. -->
+        <Class name="org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde$BooleanSerde$BooleanDeserializer"/>
+        <Method name="deserialize"/>
+        <Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
+    </Match>
+
     <Match>
         <!-- Suppress a warning about ignoring return value because this is intentional. -->
         <Class name="org.apache.kafka.common.config.AbstractConfig$ResolvingMap"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
index 1cf727712b2..227021d6cf7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
@@ -31,7 +31,6 @@ public final class ValueAndTimestamp<V> {
 
     private ValueAndTimestamp(final V value,
                               final long timestamp) {
-        Objects.requireNonNull(value);
         this.value = value;
         this.timestamp = timestamp;
     }
@@ -50,6 +49,19 @@ public final class ValueAndTimestamp<V> {
         return value == null ? null : new ValueAndTimestamp<>(value, timestamp);
     }
 
+    /**
+     * Create a new {@link ValueAndTimestamp} instance. The provided {@code value} may be {@code null}.
+     *
+     * @param value      the value
+     * @param timestamp  the timestamp
+     * @param <V> the type of the value
+     * @return a new {@link ValueAndTimestamp} instance
+     */
+    public static <V> ValueAndTimestamp<V> makeAllowNullable(
+        final V value, final long timestamp) {
+        return new ValueAndTimestamp<>(value, timestamp);
+    }
+
     /**
      * Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
      * if the parameter is not {@code null}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
new file mode 100644
index 00000000000..7d70708ff12
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_BOOLEAN_LENGTH;
+import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_TIMESTAMP_LENGTH;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final Deserializer<Boolean> booleanDeserializer;
+
+    NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) {
+        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
+        timestampDeserializer = new LongDeserializer();
+        booleanDeserializer = new BooleanDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+        booleanDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            return null;
+        }
+
+        final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp));
+        final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp));
+        if (isTombstone) {
+            return ValueAndTimestamp.makeAllowNullable(null, timestamp);
+        } else {
+            final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
+            if (value == null) {
+                throw new SerializationException("Deserializer cannot deserialize non-null bytes as null");
+            }
+            return ValueAndTimestamp.make(value, timestamp);
+        }
+    }
+
+    @Override
+    public void close() {
+        valueDeserializer.close();
+        timestampDeserializer.close();
+        booleanDeserializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        // NullableValueAndTimestampDeserializer never wraps a null deserializer (or configure would throw),
+        // but it may wrap a deserializer that itself wraps a null deserializer.
+        initNullableDeserializer(valueDeserializer, getter);
+    }
+
+    private static byte[] rawTimestamp(final byte[] rawValueAndTimestamp) {
+        final byte[] rawTimestamp = new byte[RAW_TIMESTAMP_LENGTH];
+        System.arraycopy(rawValueAndTimestamp, 0, rawTimestamp, 0, RAW_TIMESTAMP_LENGTH);
+        return rawTimestamp;
+    }
+
+    private static byte[] rawIsTombstone(final byte[] rawValueAndTimestamp) {
+        final byte[] rawIsTombstone = new byte[RAW_BOOLEAN_LENGTH];
+        System.arraycopy(rawValueAndTimestamp, RAW_TIMESTAMP_LENGTH, rawIsTombstone, 0, RAW_BOOLEAN_LENGTH);
+        return rawIsTombstone;
+    }
+
+    private static byte[] rawValue(final byte[] rawValueAndTimestamp) {
+        final int rawValueLength = rawValueAndTimestamp.length - RAW_TIMESTAMP_LENGTH - RAW_BOOLEAN_LENGTH;
+        final byte[] rawValue = new byte[rawValueLength];
+        System.arraycopy(rawValueAndTimestamp, RAW_TIMESTAMP_LENGTH + RAW_BOOLEAN_LENGTH, rawValue, 0, rawValueLength);
+        return rawValue;
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
new file mode 100644
index 00000000000..090a4daa35c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+
+    static final int RAW_TIMESTAMP_LENGTH = 8;
+    static final int RAW_BOOLEAN_LENGTH = 1;
+
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    return null;
+                }
+
+                return new byte[] {
+                    data ? TRUE : FALSE
+                };
+            }
+        }
+
+        static class BooleanDeserializer implements Deserializer<Boolean> {
+            @Override
+            public Boolean deserialize(final String topic, final byte[] data) {
+                if (data == null) {
+                    return null;
+                }
+
+                if (data.length != 1) {
+                    throw new SerializationException("Size of data received by BooleanDeserializer is not 1");
+                }
+
+                if (data[0] == TRUE) {
+                    return true;
+                } else if (data[0] == FALSE) {
+                    return false;
+                } else {
+                    throw new SerializationException("Unexpected byte received by BooleanDeserializer: " + data[0]);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
new file mode 100644
index 00000000000..93eaca0ff21
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_BOOLEAN_LENGTH;
+import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_TIMESTAMP_LENGTH;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final Serializer<Boolean> booleanSerializer;
+
+    NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
+        this.valueSerializer = Objects.requireNonNull(valueSerializer);
+        timestampSerializer = new LongSerializer();
+        booleanSerializer = new BooleanSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+        booleanSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) {
+        if (data == null) {
+            return null;
+        }
+        final byte[] rawValue = valueSerializer.serialize(topic, data.value());
+        final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null);
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp());
+        if (rawIsTombstone.length != RAW_BOOLEAN_LENGTH) {
+            throw new SerializationException("Unexpected length for serialized boolean: " + rawIsTombstone.length);
+        }
+        if (rawTimestamp.length != RAW_TIMESTAMP_LENGTH) {
+            throw new SerializationException("Unexpected length for serialized timestamp: " + rawTimestamp.length);
+        }
+
+        final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
+        return ByteBuffer
+            .allocate(RAW_TIMESTAMP_LENGTH + RAW_BOOLEAN_LENGTH + nonNullRawValue.length)
+            .put(rawTimestamp)
+            .put(rawIsTombstone)
+            .put(nonNullRawValue)
+            .array();
+    }
+
+    @Override
+    public void close() {
+        valueSerializer.close();
+        timestampSerializer.close();
+        booleanSerializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        // NullableValueAndTimestampSerializer never wraps a null serializer (or configure would throw),
+        // but it may wrap a serializer that itself wraps a null serializer.
+        initNullableSerializer(valueSerializer, getter);
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java
new file mode 100644
index 00000000000..3b5bb7fd618
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class NullableValueAndTimestampSerdeTest {
+
+    private final static NullableValueAndTimestampSerde<String> SERDE = new NullableValueAndTimestampSerde<>(new StringSerde());
+    private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = SERDE.serializer();
+    private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER = SERDE.deserializer();
+
+    @Test
+    public void shouldSerdeNull() {
+        assertThat(SERIALIZER.serialize(null, null), is(nullValue()));
+        assertThat(DESERIALIZER.deserialize(null, null), is(nullValue()));
+    }
+
+    @Test
+    public void shouldSerdeNonNull() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("foo", 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerdeNonNullWithNullValue() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.makeAllowNullable(null, 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerializeNonNullWithEmptyBytes() {
+        // empty string serializes to empty bytes
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("", 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+}
\ No newline at end of file