You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/02/15 23:07:31 UTC

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

vcrfxia commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107815109


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow deserialization so

Review Comment:
   Resolved below.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow deserialization so
+                    // we fail here during serialization too for consistency
+                    throw new SerializationException("BooleanSerializer does not support null");
+                }
+
+                return new byte[] {
+                    data ? TRUE : FALSE
+                };
+            }
+        }
+
+        static class BooleanDeserializer implements Deserializer<Boolean> {
+            @Override
+            public Boolean deserialize(final String topic, final byte[] data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow it (NP_BOOLEAN_RETURN_NULL)

Review Comment:
   OK, added an exception for this particular class.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final Deserializer<Boolean> booleanDeserializer;
+
+    NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) {
+        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
+        timestampDeserializer = new LongDeserializer();
+        booleanDeserializer = new BooleanDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+        booleanDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            return null;
+        }
+
+        final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp));
+        final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp));
+        if (isTombstone) {
+            return ValueAndTimestamp.makeAllowNullable(null, timestamp);
+        } else {
+            final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
+            return ValueAndTimestamp.makeAllowNullable(value, timestamp);

Review Comment:
   You're correct that `value` should never be null here, assuming valid deserializer implementations which never deserialize non-null into null. 
   
   Regarding updating this call to `make()`, `make()` does not throw if `value == null`, instead it returns `null`. In the event of a buggy deserializer implementation which returns null when it shouldn't, calling `make()` here and returning null will cause cascading failures elsewhere -- for example, the changelogging layer uses this deserializer to obtain the value to write to the changelog (see https://github.com/apache/kafka/pull/13251) and will throw an exception if it gets a null ValueAndTimestamp from the deserializer. 
   
   I suppose I can update this to `make()` and throw an explicit exception if `value == null`. I'll do that.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class NullableValueAndTimestampSerdeTest {
+
+    private final static NullableValueAndTimestampSerde<String> SERDE = new NullableValueAndTimestampSerde<>(new StringSerde());
+    private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = SERDE.serializer();
+    private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER = SERDE.deserializer();
+
+    @Test
+    public void shouldSerdeNull() {
+        assertThat(SERIALIZER.serialize(null, null), is(nullValue()));
+        assertThat(DESERIALIZER.deserialize(null, null), is(nullValue()));
+    }
+
+    @Test
+    public void shouldSerdeNonNull() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("foo", 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerdeNonNullWithNullValue() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.makeAllowNullable(null, 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerializeNonNullWithEmptyBytes() {

Review Comment:
   StringSerializer serializes an empty string as `byte[0]`, which is exactly what this test case is for (`isTombstone` will be set to `false`, which is how `byte[0]` is distinguished from a null value). 
   
   Did I misunderstand your question?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final Serializer<Boolean> booleanSerializer;
+
+    NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
+        this.valueSerializer = Objects.requireNonNull(valueSerializer);
+        timestampSerializer = new LongSerializer();
+        booleanSerializer = new BooleanSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+        booleanSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) {
+        if (data == null) {
+            return null;
+        }
+        final byte[] rawValue = valueSerializer.serialize(topic, data.value());
+        final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null);
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp());
+
+        final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
+        return ByteBuffer
+            .allocate(rawTimestamp.length + rawIsTombstone.length + nonNullRawValue.length)

Review Comment:
   Yes that's correct. I extracted the expected lengths into static variables and added the relevant checks in order to be defensive against serialization surprises.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org